DRILL-5983: Add missing nullable Parquet readers for INT and UINT logical types
closes #1866
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 03d5382..7f8c018 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -19,8 +19,6 @@
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.vector.VarDecimalVector;
-import org.apache.drill.exec.vector.NullableVarDecimalVector;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.DateVector;
@@ -37,8 +35,11 @@
import org.apache.drill.exec.vector.NullableIntervalVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableUInt4Vector;
+import org.apache.drill.exec.vector.NullableUInt8Vector;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVarDecimalVector;
import org.apache.drill.exec.vector.TimeStampVector;
import org.apache.drill.exec.vector.TimeVector;
import org.apache.drill.exec.vector.UInt4Vector;
@@ -46,6 +47,7 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VarDecimalVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
@@ -57,21 +59,22 @@
public class ColumnReaderFactory {
/**
- * @param fixedLength
- * @param descriptor
- * @param columnChunkMetaData
+ * Creates fixed column reader for the given column based on its metadata.
+ *
+ * @param fixedLength if fixed length reader should be used
+ * @param descriptor column descriptor
+ * @param columnChunkMetaData column metadata
+ *
* @return ColumnReader object instance
- * @throws SchemaChangeException
*/
static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, ValueVector v,
- SchemaElement schemaElement)
- throws Exception {
+ ColumnChunkMetaData columnChunkMetaData, ValueVector v,
+ SchemaElement schemaElement) throws Exception {
ConvertedType convertedType = schemaElement.getConverted_type();
// if the column is required, or repeated (in which case we just want to use this to generate our appropriate
// ColumnReader for actually transferring data into the data vector inside of our repeated vector
if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0) {
- if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
return new BitReader(recordReader, descriptor, columnChunkMetaData,
fixedLength, (BitVector) v, schemaElement);
} else if (!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && (
@@ -279,6 +282,16 @@
return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
}
switch (convertedType) {
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec, schemaElement);
+ case UINT_8:
+ case UINT_16:
+ case UINT_32:
+ return new NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength, (NullableUInt4Vector) valueVec, schemaElement);
case DECIMAL:
return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
@@ -292,6 +305,9 @@
return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec, schemaElement);
}
switch (convertedType) {
+ case UINT_64:
+ return new NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader, columnDescriptor,
+ columnChunkMetaData, fixedLength, (NullableUInt8Vector) valueVec, schemaElement);
case DECIMAL:
return new NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableVarDecimalVector) valueVec, schemaElement);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 886721e..94a1f59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -17,10 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import java.nio.ByteBuffer;
-
-import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
@@ -32,16 +29,21 @@
import org.apache.drill.exec.vector.NullableIntervalVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableUInt4Vector;
+import org.apache.drill.exec.vector.NullableUInt8Vector;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarDecimalVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.io.api.Binary;
import org.joda.time.DateTimeConstants;
-import io.netty.buffer.DrillBuf;
+import java.nio.ByteBuffer;
+
import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
public class NullableFixedByteAlignedReaders {
@@ -159,6 +161,31 @@
}
}
+ static class NullableDictionaryUInt4Reader extends NullableColumnReader<NullableUInt4Vector> {
+
+ NullableDictionaryUInt4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableUInt4Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ if (usingDictionary) {
+ for (int i = 0; i < recordsToReadInThisPass; i++) {
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+ }
+ int writerIndex = castedBaseVector.getBuffer().writerIndex();
+ castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) readLength);
+ } else {
+ for (int i = 0; i < recordsToReadInThisPass; i++) {
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger());
+ }
+ }
+ }
+ }
+
static class NullableDictionaryTimeReader extends NullableColumnReader<NullableTimeVector> {
NullableDictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
@@ -205,6 +232,31 @@
}
}
+ static class NullableDictionaryUInt8Reader extends NullableColumnReader<NullableUInt8Vector> {
+
+ NullableDictionaryUInt8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableUInt8Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ if (usingDictionary) {
+ for (int i = 0; i < recordsToReadInThisPass; i++) {
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+ }
+ int writerIndex = castedBaseVector.getBuffer().writerIndex();
+ castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) readLength);
+ } else {
+ for (int i = 0; i < recordsToReadInThisPass; i++) {
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong());
+ }
+ }
+ }
+ }
+
static class NullableDictionaryTimeStampReader extends NullableColumnReader<NullableTimeStampVector> {
NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
@@ -463,4 +515,3 @@
}
}
}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index e25ef1a..9e019ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.Float4Vector;
@@ -30,6 +28,8 @@
import org.apache.drill.exec.vector.UInt8Vector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarDecimalVector;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
+import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -51,14 +51,14 @@
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
-
- recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
if (usingDictionary) {
- for (int i = 0; i < recordsReadInThisIteration; i++){
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
}
+ } else {
+ super.readField(recordsToReadInThisPass);
}
}
}
@@ -150,14 +150,14 @@
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
-
- recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
if (usingDictionary) {
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
}
+ } else {
+ super.readField(recordsToReadInThisPass);
}
}
}
@@ -297,16 +297,14 @@
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
-
- recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
- for (int i = 0; i < recordsReadInThisIteration; i++){
- try {
+ if (usingDictionary) {
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
- } catch ( Exception ex) {
- throw ex;
}
+ } else {
+ super.readField(recordsToReadInThisPass);
}
}
}
@@ -321,17 +319,15 @@
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
-
- recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
- for (int i = 0; i < recordsReadInThisIteration; i++){
- try {
+ if (usingDictionary) {
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes();
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true));
- } catch ( Exception ex) {
- throw ex;
}
+ } else {
+ super.readField(recordsToReadInThisPass);
}
}
}
@@ -346,11 +342,14 @@
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ if (usingDictionary) {
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
- for (int i = 0; i < recordsReadInThisIteration; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+ }
+ } else {
+ super.readField(recordsToReadInThisPass);
}
}
}
@@ -365,11 +364,14 @@
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ if (usingDictionary) {
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
- for (int i = 0; i < recordsReadInThisIteration; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+ }
+ } else {
+ super.readField(recordsToReadInThisPass);
}
}
}