blob: d963c454d35ec63d4c5b3ef84dff6a827ea34be9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.arrow.vectorized.parquet;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.iceberg.parquet.BaseColumnIterator;
import org.apache.iceberg.parquet.BasePageIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.page.PageReader;
/**
* Vectorized version of the ColumnIterator that reads column values in data pages of a column in a row group in a
* batched fashion.
*/
public class VectorizedColumnIterator extends BaseColumnIterator {
private final VectorizedPageIterator vectorizedPageIterator;
private int batchSize;
public VectorizedColumnIterator(ColumnDescriptor desc, String writerVersion, boolean setArrowValidityVector) {
super(desc);
Preconditions.checkArgument(desc.getMaxRepetitionLevel() == 0,
"Only non-nested columns are supported for vectorized reads");
this.vectorizedPageIterator = new VectorizedPageIterator(desc, writerVersion, setArrowValidityVector);
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public Dictionary setRowGroupInfo(PageReader store, boolean allPagesDictEncoded) {
// setPageSource can result in a data page read. If that happens, we need
// to know in advance whether all the pages in the row group are dictionary encoded or not
this.vectorizedPageIterator.setAllPagesDictEncoded(allPagesDictEncoded);
super.setPageSource(store);
return dictionary;
}
public void nextBatchIntegers(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchIntegers(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchDictionaryIds(IntVector vector, NullabilityHolder holder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchDictionaryIds(vector, batchSize - rowsReadSoFar,
rowsReadSoFar, holder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
vector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchLongs(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchLongs(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchTimestampMillis(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchTimestampMillis(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchFloats(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchFloats(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchDoubles(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchDoubles(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, holder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchIntBackedDecimal(
FieldVector fieldVector,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchIntBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchLongBackedDecimal(
FieldVector fieldVector,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchFixedLengthDecimal(
FieldVector fieldVector,
int typeWidth,
NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchFixedLengthDecimal(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchVarWidthType(FieldVector fieldVector, NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchVarWidthType(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchFixedWidthBinary(FieldVector fieldVector, int typeWidth, NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch =
vectorizedPageIterator.nextBatchFixedWidthBinary(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, typeWidth, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
public void nextBatchBoolean(FieldVector fieldVector, NullabilityHolder nullabilityHolder) {
int rowsReadSoFar = 0;
while (rowsReadSoFar < batchSize && hasNext()) {
advance();
int rowsInThisBatch = vectorizedPageIterator.nextBatchBoolean(fieldVector, batchSize - rowsReadSoFar,
rowsReadSoFar, nullabilityHolder);
rowsReadSoFar += rowsInThisBatch;
this.triplesRead += rowsInThisBatch;
fieldVector.setValueCount(rowsReadSoFar);
}
}
@Override
protected BasePageIterator pageIterator() {
return vectorizedPageIterator;
}
public boolean producesDictionaryEncodedVector() {
return vectorizedPageIterator.producesDictionaryEncodedVector();
}
}