blob: 4eb8091c28151717cddd7e2d35bf73cfd84e2e7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.arrow.vectorized;
import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.iceberg.arrow.ArrowSchemaUtil;
import org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.PrimitiveType;
/**
* {@link VectorizedReader VectorReader(s)} that read in a batch of values into Arrow vectors. It also takes care of
* allocating the right kind of Arrow vectors depending on the corresponding Iceberg/Parquet data types.
*/
public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
public static final int DEFAULT_BATCH_SIZE = 5000;
private static final Integer UNKNOWN_WIDTH = null;
private static final int AVERAGE_VARIABLE_WIDTH_RECORD_SIZE = 10;
private final ColumnDescriptor columnDescriptor;
private final VectorizedColumnIterator vectorizedColumnIterator;
private final Types.NestedField icebergField;
private final BufferAllocator rootAlloc;
private int batchSize;
private FieldVector vec;
private Integer typeWidth;
private ReadType readType;
private NullabilityHolder nullabilityHolder;
// In cases when Parquet employs fall back to plain encoding, we eagerly decode the dictionary encoded pages
// before storing the values in the Arrow vector. This means even if the dictionary is present, data
// present in the vector may not necessarily be dictionary encoded.
private Dictionary dictionary;
public VectorizedArrowReader(
ColumnDescriptor desc,
Types.NestedField icebergField,
BufferAllocator ra,
boolean setArrowValidityVector) {
this.icebergField = icebergField;
this.columnDescriptor = desc;
this.rootAlloc = ra;
this.vectorizedColumnIterator = new VectorizedColumnIterator(desc, "", setArrowValidityVector);
}
private VectorizedArrowReader() {
this.icebergField = null;
this.batchSize = DEFAULT_BATCH_SIZE;
this.columnDescriptor = null;
this.rootAlloc = null;
this.vectorizedColumnIterator = null;
}
private enum ReadType {
FIXED_LENGTH_DECIMAL,
INT_BACKED_DECIMAL,
LONG_BACKED_DECIMAL,
VARCHAR,
VARBINARY,
FIXED_WIDTH_BINARY,
BOOLEAN,
INT,
LONG,
FLOAT,
DOUBLE,
TIMESTAMP_MILLIS,
DICTIONARY
}
@Override
public void setBatchSize(int batchSize) {
this.batchSize = (batchSize == 0) ? DEFAULT_BATCH_SIZE : batchSize;
this.vectorizedColumnIterator.setBatchSize(batchSize);
}
@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
boolean dictEncoded = vectorizedColumnIterator.producesDictionaryEncodedVector();
if (reuse == null || (!dictEncoded && readType == ReadType.DICTIONARY) ||
(dictEncoded && readType != ReadType.DICTIONARY)) {
allocateFieldVector(dictEncoded);
nullabilityHolder = new NullabilityHolder(batchSize);
} else {
vec.setValueCount(0);
nullabilityHolder.reset();
}
if (vectorizedColumnIterator.hasNext()) {
if (dictEncoded) {
vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
} else {
switch (readType) {
case FIXED_LENGTH_DECIMAL:
vectorizedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
break;
case INT_BACKED_DECIMAL:
vectorizedColumnIterator.nextBatchIntBackedDecimal(vec, nullabilityHolder);
break;
case LONG_BACKED_DECIMAL:
vectorizedColumnIterator.nextBatchLongBackedDecimal(vec, nullabilityHolder);
break;
case VARBINARY:
vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
break;
case VARCHAR:
vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
break;
case FIXED_WIDTH_BINARY:
vectorizedColumnIterator.nextBatchFixedWidthBinary(vec, typeWidth, nullabilityHolder);
break;
case BOOLEAN:
vectorizedColumnIterator.nextBatchBoolean(vec, nullabilityHolder);
break;
case INT:
vectorizedColumnIterator.nextBatchIntegers(vec, typeWidth, nullabilityHolder);
break;
case LONG:
vectorizedColumnIterator.nextBatchLongs(vec, typeWidth, nullabilityHolder);
break;
case FLOAT:
vectorizedColumnIterator.nextBatchFloats(vec, typeWidth, nullabilityHolder);
break;
case DOUBLE:
vectorizedColumnIterator.nextBatchDoubles(vec, typeWidth, nullabilityHolder);
break;
case TIMESTAMP_MILLIS:
vectorizedColumnIterator.nextBatchTimestampMillis(vec, typeWidth, nullabilityHolder);
break;
}
}
}
Preconditions.checkState(vec.getValueCount() == numValsToRead,
"Number of values read, %s, does not equal expected, %s", vec.getValueCount(), numValsToRead);
return new VectorHolder(columnDescriptor, vec, dictEncoded, dictionary,
nullabilityHolder, icebergField.type());
}
private void allocateFieldVector(boolean dictionaryEncodedVector) {
if (dictionaryEncodedVector) {
Field field = new Field(
icebergField.name(),
new FieldType(icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null),
null);
this.vec = field.createVector(rootAlloc);
((IntVector) vec).allocateNew(batchSize);
this.typeWidth = (int) IntVector.TYPE_WIDTH;
this.readType = ReadType.DICTIONARY;
} else {
PrimitiveType primitive = columnDescriptor.getPrimitiveType();
Field arrowField = ArrowSchemaUtil.convert(icebergField);
if (primitive.getOriginalType() != null) {
switch (primitive.getOriginalType()) {
case ENUM:
case JSON:
case UTF8:
case BSON:
this.vec = arrowField.createVector(rootAlloc);
// TODO: Possibly use the uncompressed page size info to set the initial capacity
vec.setInitialCapacity(batchSize * AVERAGE_VARIABLE_WIDTH_RECORD_SIZE);
vec.allocateNewSafe();
this.readType = ReadType.VARCHAR;
this.typeWidth = UNKNOWN_WIDTH;
break;
case INT_8:
case INT_16:
case INT_32:
this.vec = arrowField.createVector(rootAlloc);
((IntVector) vec).allocateNew(batchSize);
this.readType = ReadType.INT;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
case DATE:
this.vec = arrowField.createVector(rootAlloc);
((DateDayVector) vec).allocateNew(batchSize);
this.readType = ReadType.INT;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
case INT_64:
this.vec = arrowField.createVector(rootAlloc);
((BigIntVector) vec).allocateNew(batchSize);
this.readType = ReadType.LONG;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case TIMESTAMP_MILLIS:
this.vec = arrowField.createVector(rootAlloc);
((BigIntVector) vec).allocateNew(batchSize);
this.readType = ReadType.TIMESTAMP_MILLIS;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case TIMESTAMP_MICROS:
this.vec = arrowField.createVector(rootAlloc);
((TimeStampMicroTZVector) vec).allocateNew(batchSize);
this.readType = ReadType.LONG;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case DECIMAL:
this.vec = arrowField.createVector(rootAlloc);
((DecimalVector) vec).allocateNew(batchSize);
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
this.readType = ReadType.FIXED_LENGTH_DECIMAL;
this.typeWidth = primitive.getTypeLength();
break;
case INT64:
this.readType = ReadType.LONG_BACKED_DECIMAL;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case INT32:
this.readType = ReadType.INT_BACKED_DECIMAL;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
default:
throw new UnsupportedOperationException(
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
}
break;
default:
throw new UnsupportedOperationException(
"Unsupported logical type: " + primitive.getOriginalType());
}
} else {
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
int len = ((Types.FixedType) icebergField.type()).length();
this.vec = arrowField.createVector(rootAlloc);
vec.setInitialCapacity(batchSize * len);
vec.allocateNew();
this.readType = ReadType.FIXED_WIDTH_BINARY;
this.typeWidth = len;
break;
case BINARY:
this.vec = arrowField.createVector(rootAlloc);
// TODO: Possibly use the uncompressed page size info to set the initial capacity
vec.setInitialCapacity(batchSize * AVERAGE_VARIABLE_WIDTH_RECORD_SIZE);
vec.allocateNewSafe();
this.readType = ReadType.VARBINARY;
this.typeWidth = UNKNOWN_WIDTH;
break;
case INT32:
this.vec = arrowField.createVector(rootAlloc);
((IntVector) vec).allocateNew(batchSize);
this.readType = ReadType.INT;
this.typeWidth = (int) IntVector.TYPE_WIDTH;
break;
case FLOAT:
this.vec = arrowField.createVector(rootAlloc);
((Float4Vector) vec).allocateNew(batchSize);
this.readType = ReadType.FLOAT;
this.typeWidth = (int) Float4Vector.TYPE_WIDTH;
break;
case BOOLEAN:
this.vec = arrowField.createVector(rootAlloc);
((BitVector) vec).allocateNew(batchSize);
this.readType = ReadType.BOOLEAN;
this.typeWidth = UNKNOWN_WIDTH;
break;
case INT64:
this.vec = arrowField.createVector(rootAlloc);
((BigIntVector) vec).allocateNew(batchSize);
this.readType = ReadType.LONG;
this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
break;
case DOUBLE:
this.vec = arrowField.createVector(rootAlloc);
((Float8Vector) vec).allocateNew(batchSize);
this.readType = ReadType.DOUBLE;
this.typeWidth = (int) Float8Vector.TYPE_WIDTH;
break;
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
}
}
}
@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
ColumnChunkMetaData chunkMetaData = metadata.get(ColumnPath.get(columnDescriptor.getPath()));
this.dictionary = vectorizedColumnIterator.setRowGroupInfo(
source.getPageReader(columnDescriptor),
!ParquetUtil.hasNonDictionaryPages(chunkMetaData));
}
@Override
public void close() {
if (vec != null) {
vec.close();
}
}
@Override
public String toString() {
return columnDescriptor.toString();
}
public static VectorizedArrowReader nulls() {
return NullVectorReader.INSTANCE;
}
private static final class NullVectorReader extends VectorizedArrowReader {
private static final NullVectorReader INSTANCE = new NullVectorReader();
@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.dummyHolder(numValsToRead);
}
@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
}
@Override
public String toString() {
return "NullReader";
}
@Override
public void setBatchSize(int batchSize) {
}
}
/**
* A Dummy Vector Reader which doesn't actually read files, instead it returns a dummy
* VectorHolder which indicates the constant value which should be used for this column.
* @param <T> The constant value to use
*/
public static class ConstantVectorReader<T> extends VectorizedArrowReader {
private final T value;
public ConstantVectorReader(T value) {
this.value = value;
}
@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
return VectorHolder.constantHolder(numValsToRead, value);
}
@Override
public void setRowGroupInfo(PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
}
@Override
public String toString() {
return String.format("ConstantReader: %s", value);
}
@Override
public void setBatchSize(int batchSize) {
}
}
}