blob: 40c1b64122e0d12efe90f56962cd9b11504e3eed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.Type;
import static java.util.Collections.emptyIterator;
public class ParquetValueReaders {
private ParquetValueReaders() {
}
public static <T> ParquetValueReader<T> option(Type type, int definitionLevel,
ParquetValueReader<T> reader) {
if (type.isRepetition(Type.Repetition.OPTIONAL)) {
return new OptionReader<>(definitionLevel, reader);
}
return reader;
}
@SuppressWarnings("unchecked")
public static <T> ParquetValueReader<T> nulls() {
return (ParquetValueReader<T>) NullReader.INSTANCE;
}
public static <C> ParquetValueReader<C> constant(C value) {
return new ConstantReader<>(value);
}
public static ParquetValueReader<Long> position() {
return new PositionReader();
}
private static class NullReader<T> implements ParquetValueReader<T> {
private static final NullReader<Void> INSTANCE = new NullReader<>();
private static final ImmutableList<TripleIterator<?>> COLUMNS = ImmutableList.of();
private static final TripleIterator<?> NULL_COLUMN = new TripleIterator<Object>() {
@Override
public int currentDefinitionLevel() {
return 0;
}
@Override
public int currentRepetitionLevel() {
return 0;
}
@Override
public <N> N nextNull() {
return null;
}
@Override
public boolean hasNext() {
return false;
}
@Override
public Object next() {
return null;
}
};
private NullReader() {
}
@Override
public T read(T reuse) {
return null;
}
@Override
public TripleIterator<?> column() {
return NULL_COLUMN;
}
@Override
public List<TripleIterator<?>> columns() {
return COLUMNS;
}
@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
}
}
static class ConstantReader<C> implements ParquetValueReader<C> {
private final C constantValue;
ConstantReader(C constantValue) {
this.constantValue = constantValue;
}
@Override
public C read(C reuse) {
return constantValue;
}
@Override
public TripleIterator<?> column() {
return NullReader.NULL_COLUMN;
}
@Override
public List<TripleIterator<?>> columns() {
return NullReader.COLUMNS;
}
@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
}
}
static class PositionReader implements ParquetValueReader<Long> {
private long rowOffset = -1;
private long rowGroupStart;
@Override
public Long read(Long reuse) {
rowOffset = rowOffset + 1;
return rowGroupStart + rowOffset;
}
@Override
public TripleIterator<?> column() {
return NullReader.NULL_COLUMN;
}
@Override
public List<TripleIterator<?>> columns() {
return NullReader.COLUMNS;
}
@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
this.rowGroupStart = rowPosition;
this.rowOffset = -1;
}
}
public abstract static class PrimitiveReader<T> implements ParquetValueReader<T> {
private final ColumnDescriptor desc;
@SuppressWarnings("checkstyle:VisibilityModifier")
protected final ColumnIterator<?> column;
private final List<TripleIterator<?>> children;
protected PrimitiveReader(ColumnDescriptor desc) {
this.desc = desc;
this.column = ColumnIterator.newIterator(desc, "");
this.children = ImmutableList.of(column);
}
@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
column.setPageSource(pageStore.getPageReader(desc));
}
@Override
public TripleIterator<?> column() {
return column;
}
@Override
public List<TripleIterator<?>> columns() {
return children;
}
}
public static class UnboxedReader<T> extends PrimitiveReader<T> {
public UnboxedReader(ColumnDescriptor desc) {
super(desc);
}
@Override
@SuppressWarnings("unchecked")
public T read(T ignored) {
return (T) column.next();
}
public boolean readBoolean() {
return column.nextBoolean();
}
public int readInteger() {
return column.nextInteger();
}
public long readLong() {
return column.nextLong();
}
public float readFloat() {
return column.nextFloat();
}
public double readDouble() {
return column.nextDouble();
}
public Binary readBinary() {
return column.nextBinary();
}
}
public static class StringReader extends PrimitiveReader<String> {
public StringReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public String read(String reuse) {
return column.nextBinary().toStringUsingUTF8();
}
}
public static class IntAsLongReader extends UnboxedReader<Long> {
public IntAsLongReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public Long read(Long ignored) {
return readLong();
}
@Override
public long readLong() {
return super.readInteger();
}
}
public static class FloatAsDoubleReader extends UnboxedReader<Double> {
public FloatAsDoubleReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public Double read(Double ignored) {
return readDouble();
}
@Override
public double readDouble() {
return super.readFloat();
}
}
public static class IntegerAsDecimalReader extends PrimitiveReader<BigDecimal> {
private final int scale;
public IntegerAsDecimalReader(ColumnDescriptor desc, int scale) {
super(desc);
this.scale = scale;
}
@Override
public BigDecimal read(BigDecimal ignored) {
return new BigDecimal(BigInteger.valueOf(column.nextInteger()), scale);
}
}
public static class LongAsDecimalReader extends PrimitiveReader<BigDecimal> {
private final int scale;
public LongAsDecimalReader(ColumnDescriptor desc, int scale) {
super(desc);
this.scale = scale;
}
@Override
public BigDecimal read(BigDecimal ignored) {
return new BigDecimal(BigInteger.valueOf(column.nextLong()), scale);
}
}
public static class BinaryAsDecimalReader extends PrimitiveReader<BigDecimal> {
private int scale;
public BinaryAsDecimalReader(ColumnDescriptor desc, int scale) {
super(desc);
this.scale = scale;
}
@Override
public BigDecimal read(BigDecimal reuse) {
byte[] bytes = column.nextBinary().getBytesUnsafe();
return new BigDecimal(new BigInteger(bytes), scale);
}
}
public static class BytesReader extends PrimitiveReader<ByteBuffer> {
public BytesReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public ByteBuffer read(ByteBuffer reuse) {
Binary binary = column.nextBinary();
ByteBuffer data = binary.toByteBuffer();
if (reuse != null && reuse.hasArray() && reuse.capacity() >= data.remaining()) {
data.get(reuse.array(), reuse.arrayOffset(), data.remaining());
reuse.position(0);
reuse.limit(data.remaining());
return reuse;
} else {
byte[] array = new byte[data.remaining()];
data.get(array, 0, data.remaining());
return ByteBuffer.wrap(array);
}
}
}
public static class ByteArrayReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
public ByteArrayReader(ColumnDescriptor desc) {
super(desc);
}
@Override
public byte[] read(byte[] ignored) {
return column.nextBinary().getBytes();
}
}
private static class OptionReader<T> implements ParquetValueReader<T> {
private final int definitionLevel;
private final ParquetValueReader<T> reader;
private final TripleIterator<?> column;
private final List<TripleIterator<?>> children;
OptionReader(int definitionLevel, ParquetValueReader<T> reader) {
this.definitionLevel = definitionLevel;
this.reader = reader;
this.column = reader.column();
this.children = reader.columns();
}
@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
reader.setPageSource(pageStore, rowPosition);
}
@Override
public TripleIterator<?> column() {
return column;
}
@Override
public T read(T reuse) {
if (column.currentDefinitionLevel() > definitionLevel) {
return reader.read(reuse);
}
for (TripleIterator<?> child : children) {
child.nextNull();
}
return null;
}
@Override
public List<TripleIterator<?>> columns() {
return children;
}
}
public abstract static class RepeatedReader<T, I, E> implements ParquetValueReader<T> {
private final int definitionLevel;
private final int repetitionLevel;
private final ParquetValueReader<E> reader;
private final TripleIterator<?> column;
private final List<TripleIterator<?>> children;
protected RepeatedReader(int definitionLevel, int repetitionLevel, ParquetValueReader<E> reader) {
this.definitionLevel = definitionLevel;
this.repetitionLevel = repetitionLevel;
this.reader = reader;
this.column = reader.column();
this.children = reader.columns();
}
@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
reader.setPageSource(pageStore, rowPosition);
}
@Override
public TripleIterator<?> column() {
return column;
}
@Override
public T read(T reuse) {
I intermediate = newListData(reuse);
do {
if (column.currentDefinitionLevel() > definitionLevel) {
addElement(intermediate, reader.read(getElement(intermediate)));
} else {
// consume the empty list triple
for (TripleIterator<?> child : children) {
child.nextNull();
}
// if the current definition level is equal to the definition level of this repeated type,
// then the result is an empty list and the repetition level will always be <= rl.
break;
}
} while (column.currentRepetitionLevel() > repetitionLevel);
return buildList(intermediate);
}
@Override
public List<TripleIterator<?>> columns() {
return children;
}
protected abstract I newListData(T reuse);
protected abstract E getElement(I list);
protected abstract void addElement(I list, E element);
protected abstract T buildList(I list);
}
public static class ListReader<E> extends RepeatedReader<List<E>, List<E>, E> {
private List<E> lastList = null;
private Iterator<E> elements = null;
public ListReader(int definitionLevel, int repetitionLevel,
ParquetValueReader<E> reader) {
super(definitionLevel, repetitionLevel, reader);
}
@Override
protected List<E> newListData(List<E> reuse) {
List<E> list;
if (lastList != null) {
lastList.clear();
list = lastList;
} else {
list = Lists.newArrayList();
}
if (reuse != null) {
this.lastList = reuse;
this.elements = reuse.iterator();
} else {
this.lastList = null;
this.elements = emptyIterator();
}
return list;
}
@Override
protected E getElement(List<E> reuse) {
if (elements.hasNext()) {
return elements.next();
}
return null;
}
@Override
protected void addElement(List<E> list, E element) {
list.add(element);
}
@Override
protected List<E> buildList(List<E> list) {
return list;
}
}
public abstract static class RepeatedKeyValueReader<M, I, K, V> implements ParquetValueReader<M> {
private final int definitionLevel;
private final int repetitionLevel;
private final ParquetValueReader<K> keyReader;
private final ParquetValueReader<V> valueReader;
private final TripleIterator<?> column;
private final List<TripleIterator<?>> children;
protected RepeatedKeyValueReader(int definitionLevel, int repetitionLevel,
ParquetValueReader<K> keyReader, ParquetValueReader<V> valueReader) {
this.definitionLevel = definitionLevel;
this.repetitionLevel = repetitionLevel;
this.keyReader = keyReader;
this.valueReader = valueReader;
this.column = keyReader.column();
this.children = ImmutableList.<TripleIterator<?>>builder()
.addAll(keyReader.columns())
.addAll(valueReader.columns())
.build();
}
@Override
public void setPageSource(PageReadStore pageStore, long rowPosition) {
keyReader.setPageSource(pageStore, rowPosition);
valueReader.setPageSource(pageStore, rowPosition);
}
@Override
public TripleIterator<?> column() {
return column;
}
@Override
public M read(M reuse) {
I intermediate = newMapData(reuse);
do {
if (column.currentDefinitionLevel() > definitionLevel) {
Map.Entry<K, V> pair = getPair(intermediate);
addPair(intermediate, keyReader.read(pair.getKey()), valueReader.read(pair.getValue()));
} else {
// consume the empty map triple
for (TripleIterator<?> child : children) {
child.nextNull();
}
// if the current definition level is equal to the definition level of this repeated type,
// then the result is an empty list and the repetition level will always be <= rl.
break;
}
} while (column.currentRepetitionLevel() > repetitionLevel);
return buildMap(intermediate);
}
@Override
public List<TripleIterator<?>> columns() {
return children;
}
protected abstract I newMapData(M reuse);
protected abstract Map.Entry<K, V> getPair(I map);
protected abstract void addPair(I map, K key, V value);
protected abstract M buildMap(I map);
}
public static class MapReader<K, V> extends RepeatedKeyValueReader<Map<K, V>, Map<K, V>, K, V> {
private final ReusableEntry<K, V> nullEntry = new ReusableEntry<>();
private Map<K, V> lastMap = null;
private Iterator<Map.Entry<K, V>> pairs = null;
public MapReader(int definitionLevel, int repetitionLevel,
ParquetValueReader<K> keyReader,
ParquetValueReader<V> valueReader) {
super(definitionLevel, repetitionLevel, keyReader, valueReader);
}
@Override
protected Map<K, V> newMapData(Map<K, V> reuse) {
Map<K, V> map;
if (lastMap != null) {
lastMap.clear();
map = lastMap;
} else {
map = Maps.newLinkedHashMap();
}
if (reuse != null) {
this.lastMap = reuse;
this.pairs = reuse.entrySet().iterator();
} else {
this.lastMap = null;
this.pairs = emptyIterator();
}
return map;
}
@Override
protected Map.Entry<K, V> getPair(Map<K, V> map) {
if (pairs.hasNext()) {
return pairs.next();
} else {
return nullEntry;
}
}
@Override
protected void addPair(Map<K, V> map, K key, V value) {
map.put(key, value);
}
@Override
protected Map<K, V> buildMap(Map<K, V> map) {
return map;
}
}
public static class ReusableEntry<K, V> implements Map.Entry<K, V> {
private K key = null;
private V value = null;
public void set(K newKey, V newValue) {
this.key = newKey;
this.value = newValue;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V newValue) {
V lastValue = this.value;
this.value = newValue;
return lastValue;
}
}
public abstract static class StructReader<T, I> implements ParquetValueReader<T> {
private interface Setter<R> {
void set(R record, int pos, Object reuse);
}
private final ParquetValueReader<?>[] readers;
private final TripleIterator<?> column;
private final List<TripleIterator<?>> children;
@SuppressWarnings("unchecked")
protected StructReader(List<Type> types, List<ParquetValueReader<?>> readers) {
this.readers = (ParquetValueReader<?>[]) Array.newInstance(
ParquetValueReader.class, readers.size());
TripleIterator<?>[] columns = (TripleIterator<?>[]) Array.newInstance(TripleIterator.class, readers.size());
Setter<I>[] setters = (Setter<I>[]) Array.newInstance(Setter.class, readers.size());
ImmutableList.Builder<TripleIterator<?>> columnsBuilder = ImmutableList.builder();
for (int i = 0; i < readers.size(); i += 1) {
ParquetValueReader<?> reader = readers.get(i);
this.readers[i] = readers.get(i);
columns[i] = reader.column();
setters[i] = newSetter(reader, types.get(i));
columnsBuilder.addAll(reader.columns());
}
this.children = columnsBuilder.build();
this.column = firstNonNullColumn(children);
}
@Override
public final void setPageSource(PageReadStore pageStore, long rowPosition) {
for (int i = 0; i < readers.length; i += 1) {
readers[i].setPageSource(pageStore, rowPosition);
}
}
@Override
public final TripleIterator<?> column() {
return column;
}
@Override
public final T read(T reuse) {
I intermediate = newStructData(reuse);
for (int i = 0; i < readers.length; i += 1) {
set(intermediate, i, readers[i].read(get(intermediate, i)));
// setters[i].set(intermediate, i, get(intermediate, i));
}
return buildStruct(intermediate);
}
@Override
public List<TripleIterator<?>> columns() {
return children;
}
@SuppressWarnings("unchecked")
private <E> Setter<I> newSetter(ParquetValueReader<E> reader, Type type) {
if (reader instanceof UnboxedReader && type.isPrimitive()) {
UnboxedReader<?> unboxed = (UnboxedReader<?>) reader;
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
case BOOLEAN:
return (record, pos, ignored) -> setBoolean(record, pos, unboxed.readBoolean());
case INT32:
return (record, pos, ignored) -> setInteger(record, pos, unboxed.readInteger());
case INT64:
return (record, pos, ignored) -> setLong(record, pos, unboxed.readLong());
case FLOAT:
return (record, pos, ignored) -> setFloat(record, pos, unboxed.readFloat());
case DOUBLE:
return (record, pos, ignored) -> setDouble(record, pos, unboxed.readDouble());
case INT96:
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return (record, pos, ignored) -> set(record, pos, unboxed.readBinary());
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
// TODO: Add support for options to avoid the null check
return (record, pos, reuse) -> {
Object obj = reader.read((E) reuse);
if (obj != null) {
set(record, pos, obj);
} else {
setNull(record, pos);
}
};
}
@SuppressWarnings("unchecked")
private <E> E get(I intermediate, int pos) {
return (E) getField(intermediate, pos);
}
protected abstract I newStructData(T reuse);
protected abstract Object getField(I intermediate, int pos);
protected abstract T buildStruct(I struct);
/**
* Used to set a struct value by position.
* <p>
* To avoid boxing, override {@link #setInteger(Object, int, int)} and similar methods.
*
* @param struct a struct object created by {@link #newStructData(Object)}
* @param pos the position in the struct to set
* @param value the value to set
*/
protected abstract void set(I struct, int pos, Object value);
protected void setNull(I struct, int pos) {
set(struct, pos, null);
}
protected void setBoolean(I struct, int pos, boolean value) {
set(struct, pos, value);
}
protected void setInteger(I struct, int pos, int value) {
set(struct, pos, value);
}
protected void setLong(I struct, int pos, long value) {
set(struct, pos, value);
}
protected void setFloat(I struct, int pos, float value) {
set(struct, pos, value);
}
protected void setDouble(I struct, int pos, double value) {
set(struct, pos, value);
}
/**
* Find a non-null column or return NULL_COLUMN if one is not available.
*
* @param columns a collection of triple iterator columns
* @return the first non-null column in columns
*/
private TripleIterator<?> firstNonNullColumn(List<TripleIterator<?>> columns) {
for (TripleIterator<?> col : columns) {
if (col != NullReader.NULL_COLUMN) {
return col;
}
}
return NullReader.NULL_COLUMN;
}
}
}