blob: 74d9e1527985b116168d01d151e996a4181bd09e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.arrow.vectorized.parquet;
import java.nio.ByteBuffer;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
import org.apache.parquet.column.Dictionary;
/**
* This decoder reads Parquet dictionary encoded data in a vectorized fashion. Unlike other
* vectorized readers, methods in this decoder don't need to read definition levels. In other
* words, these methods are called when there are non-null values to be read.
*/
public class VectorizedDictionaryEncodedParquetValuesReader extends BaseVectorizedParquetValuesReader {
public VectorizedDictionaryEncodedParquetValuesReader(int maxDefLevel, boolean setValidityVector) {
super(maxDefLevel, setValidityVector);
}
void readBatchOfDictionaryIds(IntVector intVector, int startOffset, int numValuesToRead,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int numValues = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < numValues; i++) {
intVector.set(idx, currentValue);
setNotNull(intVector, nullabilityHolder, idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < numValues; i++) {
intVector.set(idx, packedValuesBuffer[packedValuesBufferIdx++]);
setNotNull(intVector, nullabilityHolder, idx);
idx++;
}
break;
}
left -= numValues;
currentCount -= numValues;
}
}
void readBatchOfDictionaryEncodedLongs(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder, int typeWidth) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int numValues = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < numValues; i++) {
vector.getDataBuffer().setLong(idx * typeWidth, dict.decodeToLong(currentValue));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < numValues; i++) {
vector.getDataBuffer()
.setLong(idx * typeWidth, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
}
left -= numValues;
currentCount -= numValues;
}
}
void readBatchOfDictionaryEncodedTimestampMillis(
FieldVector vector, int startOffset, int numValuesToRead,
Dictionary dict, NullabilityHolder nullabilityHolder, int typeWidth) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int numValues = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < numValues; i++) {
vector.getDataBuffer().setLong(idx * typeWidth, dict.decodeToLong(currentValue) * 1000);
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < numValues; i++) {
vector.getDataBuffer()
.setLong(idx * typeWidth, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]) * 1000);
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
}
left -= numValues;
currentCount -= numValues;
}
}
void readBatchOfDictionaryEncodedIntegers(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder, int typeWidth) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
vector.getDataBuffer().setInt(idx * typeWidth, dict.decodeToInt(currentValue));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
vector.getDataBuffer()
.setInt(idx * typeWidth, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
void readBatchOfDictionaryEncodedFloats(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder, int typeWidth) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
vector.getDataBuffer().setFloat(idx * typeWidth, dict.decodeToFloat(currentValue));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
vector.getDataBuffer()
.setFloat(idx * typeWidth, dict.decodeToFloat(packedValuesBuffer[packedValuesBufferIdx++]));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
void readBatchOfDictionaryEncodedDoubles(FieldVector vector, int startOffset, int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder, int typeWidth) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
vector.getDataBuffer().setDouble(idx * typeWidth, dict.decodeToDouble(currentValue));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
vector.getDataBuffer()
.setDouble(idx * typeWidth, dict.decodeToDouble(packedValuesBuffer[packedValuesBufferIdx++]));
setNotNull(vector, nullabilityHolder, idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
void readBatchOfDictionaryEncodedFixedWidthBinary(FieldVector vector, int typeWidth, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
ByteBuffer buffer = dict.decodeToBinary(currentValue).toByteBuffer();
setFixedWidthBinary(vector, typeWidth, nullabilityHolder, idx, buffer);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
ByteBuffer buffer = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).toByteBuffer();
setFixedWidthBinary(vector, typeWidth, nullabilityHolder, idx, buffer);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
private void setFixedWidthBinary(
FieldVector vector, int typeWidth, NullabilityHolder nullabilityHolder,
int idx, ByteBuffer buffer) {
vector.getDataBuffer()
.setBytes(idx * typeWidth, buffer.array(),
buffer.position() + buffer.arrayOffset(), buffer.limit() - buffer.position());
setNotNull(vector, nullabilityHolder, idx);
}
private void setNotNull(FieldVector vector, NullabilityHolder nullabilityHolder, int idx) {
nullabilityHolder.setNotNull(idx);
if (setArrowValidityVector) {
BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), idx);
}
}
void readBatchOfDictionaryEncodedFixedLengthDecimals(FieldVector vector, int typeWidth, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
void readBatchOfDictionaryEncodedVarWidthBinary(FieldVector vector, int startOffset, int numValuesToRead,
Dictionary dict, NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
ByteBuffer buffer = dict.decodeToBinary(currentValue).toByteBuffer();
((BaseVariableWidthVector) vector).setSafe(idx, buffer.array(),
buffer.position() + buffer.arrayOffset(), buffer.limit() - buffer.position());
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
ByteBuffer buffer = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).toByteBuffer();
((BaseVariableWidthVector) vector).setSafe(idx, buffer.array(),
buffer.position() + buffer.arrayOffset(), buffer.limit() - buffer.position());
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
void readBatchOfDictionaryEncodedIntBackedDecimals(FieldVector vector, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx,
dict.decodeToInt(currentValue));
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
void readBatchOfDictionaryEncodedLongBackedDecimals(FieldVector vector, int startOffset,
int numValuesToRead, Dictionary dict,
NullabilityHolder nullabilityHolder) {
int left = numValuesToRead;
int idx = startOffset;
while (left > 0) {
if (this.currentCount == 0) {
this.readNextGroup();
}
int num = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx,
dict.decodeToLong(currentValue));
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
case PACKED:
for (int i = 0; i < num; i++) {
((DecimalVector) vector).set(
idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
nullabilityHolder.setNotNull(idx);
idx++;
}
break;
}
left -= num;
currentCount -= num;
}
}
}