blob: 987696235f6925ebd7f77ab4fd7b55b3cfabe242 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.arrow.vectorized.parquet;
import java.io.IOException;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.parquet.BasePageIterator;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ValuesAsBytesReader;
import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
public class VectorizedPageIterator extends BasePageIterator {
private final boolean setArrowValidityVector;
public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, boolean setValidityVector) {
super(desc, writerVersion);
this.setArrowValidityVector = setValidityVector;
}
private ValuesAsBytesReader plainValuesReader = null;
private VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader = null;
private boolean allPagesDictEncoded;
private VectorizedParquetDefinitionLevelReader vectorizedDefinitionLevelReader;
private enum DictionaryDecodeMode {
NONE, // plain encoding
LAZY,
EAGER
}
private DictionaryDecodeMode dictionaryDecodeMode;
public void setAllPagesDictEncoded(boolean allDictEncoded) {
this.allPagesDictEncoded = allDictEncoded;
}
@Override
protected void reset() {
super.reset();
this.plainValuesReader = null;
this.vectorizedDefinitionLevelReader = null;
}
/**
* Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels,
* dictionary ids in Parquet are RLE/bin-packed encoded as well.
*/
public int nextBatchDictionaryIds(
final IntVector vector, final int expectedBatchSize,
final int numValsInVector,
NullabilityHolder holder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
vectorizedDefinitionLevelReader.readBatchOfDictionaryIds(
vector,
numValsInVector,
actualBatchSize,
holder,
dictionaryEncodedValuesReader);
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading a batch of values of INT32 data type
*/
public int nextBatchIntegers(
final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
final int typeWidth, NullabilityHolder holder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedIntegers(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfIntegers(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading a batch of values of INT64 data type
*/
public int nextBatchLongs(
final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
final int typeWidth, NullabilityHolder holder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedLongs(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfLongs(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading a batch of values of TIMESTAMP_MILLIS data type. In iceberg, TIMESTAMP
* is always represented in micro-seconds. So we multiply values stored in millis with 1000
* before writing them to the vector.
*/
public int nextBatchTimestampMillis(
final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
final int typeWidth, NullabilityHolder holder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedTimestampMillis(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfTimestampMillis(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading a batch of values of FLOAT data type.
*/
public int nextBatchFloats(
final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
final int typeWidth, NullabilityHolder holder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFloats(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfFloats(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading a batch of values of DOUBLE data type
*/
public int nextBatchDoubles(
final FieldVector vector, final int expectedBatchSize,
final int numValsInVector,
final int typeWidth, NullabilityHolder holder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedDoubles(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfDoubles(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
holder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
private int getActualBatchSize(int expectedBatchSize) {
return Math.min(expectedBatchSize, triplesCount - triplesRead);
}
/**
* Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since Arrow stores all
* decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
*/
public int nextBatchIntBackedDecimal(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader
.readBatchOfDictionaryEncodedIntBackedDecimals(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfIntBackedDecimals(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
public int nextBatchLongBackedDecimal(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader
.readBatchOfDictionaryEncodedLongBackedDecimals(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfLongBackedDecimals(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading a batch of decimals backed by fixed length byte array parquet data type. Arrow stores all
* decimals in 16 bytes. This method provides the necessary padding to the decimals read. Moreover, Arrow interprets
* the decimals in Arrow buffer as little endian. Parquet stores fixed length decimals as big endian. So, this method
* uses {@link DecimalVector#setBigEndian(int, byte[])} method so that the data in Arrow vector is indeed little
* endian.
*/
public int nextBatchFixedLengthDecimal(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
final int typeWidth, NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfFixedLengthDecimals(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON).
*/
public int nextBatchVarWidthType(
final FieldVector vector,
final int expectedBatchSize,
final int numValsInVector,
NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedVarWidth(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchVarWidth(
vector,
numValsInVector,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
/**
* Method for reading batches of fixed width binary type (e.g. BYTE[7]). Spark does not support fixed width binary
* data type. To work around this limitation, the data is read as fixed width binary from parquet and stored in a
* {@link VarBinaryVector} in Arrow.
*/
public int nextBatchFixedWidthBinary(
final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
final int typeWidth, NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
dictionaryEncodedValuesReader,
dictionary);
} else {
vectorizedDefinitionLevelReader.readBatchOfFixedWidthBinary(
vector,
numValsInVector,
typeWidth,
actualBatchSize,
nullabilityHolder,
plainValuesReader);
}
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
public boolean producesDictionaryEncodedVector() {
return dictionaryDecodeMode == DictionaryDecodeMode.LAZY;
}
/**
* Method for reading batches of booleans.
*/
public int nextBatchBoolean(
final FieldVector vector,
final int expectedBatchSize,
final int numValsInVector,
NullabilityHolder nullabilityHolder) {
final int actualBatchSize = getActualBatchSize(expectedBatchSize);
if (actualBatchSize <= 0) {
return 0;
}
vectorizedDefinitionLevelReader
.readBatchOfBooleans(vector, numValsInVector, actualBatchSize,
nullabilityHolder, plainValuesReader);
triplesRead += actualBatchSize;
this.hasNext = triplesRead < triplesCount;
return actualBatchSize;
}
@Override
protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
ValuesReader previousReader = plainValuesReader;
if (dataEncoding.usesDictionary()) {
if (dictionary == null) {
throw new ParquetDecodingException(
"could not read page in col " + desc + " as the dictionary was missing for encoding " + dataEncoding);
}
try {
dictionaryEncodedValuesReader =
new VectorizedDictionaryEncodedParquetValuesReader(desc.getMaxDefinitionLevel(), setArrowValidityVector);
dictionaryEncodedValuesReader.initFromPage(valueCount, in);
if (ParquetUtil.isIntType(desc.getPrimitiveType()) || !allPagesDictEncoded) {
dictionaryDecodeMode = DictionaryDecodeMode.EAGER;
} else {
dictionaryDecodeMode = DictionaryDecodeMode.LAZY;
}
} catch (IOException e) {
throw new ParquetDecodingException("could not read page in col " + desc, e);
}
} else {
plainValuesReader = new ValuesAsBytesReader();
plainValuesReader.initFromPage(valueCount, in);
dictionaryDecodeMode = DictionaryDecodeMode.NONE;
}
if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding) &&
previousReader != null && previousReader instanceof RequiresPreviousReader) {
// previous reader can only be set if reading sequentially
((RequiresPreviousReader) plainValuesReader).setPreviousReader(previousReader);
}
}
@Override
protected void initDefinitionLevelsReader(DataPageV1 dataPageV1, ColumnDescriptor desc, ByteBufferInputStream in,
int triplesCount) throws IOException {
this.vectorizedDefinitionLevelReader = newVectorizedDefinitionLevelReader(desc);
this.vectorizedDefinitionLevelReader.initFromPage(triplesCount, in);
}
@Override
protected void initDefinitionLevelsReader(DataPageV2 dataPageV2, ColumnDescriptor desc) {
this.vectorizedDefinitionLevelReader = newVectorizedDefinitionLevelReader(desc);
}
private VectorizedParquetDefinitionLevelReader newVectorizedDefinitionLevelReader(ColumnDescriptor desc) {
int bitwidth = BytesUtils.getWidthFromMaxInt(desc.getMaxDefinitionLevel());
return new VectorizedParquetDefinitionLevelReader(bitwidth, desc.getMaxDefinitionLevel(), setArrowValidityVector);
}
}