| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.parquet.columnreaders; |
| |
| import org.apache.drill.common.exceptions.ExecutionSetupException; |
| import org.apache.drill.exec.vector.BigIntVector; |
| import org.apache.drill.exec.vector.Float4Vector; |
| import org.apache.drill.exec.vector.Float8Vector; |
| import org.apache.drill.exec.vector.IntVector; |
| import org.apache.drill.exec.vector.TimeStampVector; |
| import org.apache.drill.exec.vector.TimeVector; |
| import org.apache.drill.exec.vector.UInt4Vector; |
| import org.apache.drill.exec.vector.UInt8Vector; |
| import org.apache.drill.exec.vector.VarBinaryVector; |
| import org.apache.drill.exec.vector.VarDecimalVector; |
| import org.apache.drill.shaded.guava.com.google.common.primitives.Ints; |
| import org.apache.drill.shaded.guava.com.google.common.primitives.Longs; |
| import org.apache.parquet.column.ColumnDescriptor; |
| import org.apache.parquet.format.SchemaElement; |
| import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; |
| import org.apache.parquet.io.api.Binary; |
| |
| import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; |
| |
| public class ParquetFixedWidthDictionaryReaders { |
| |
| private static final double BITS_COUNT_IN_BYTE_DOUBLE_VALUE = 8.0; |
| |
| static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> { |
| DictionaryIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); |
| } |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| /** |
| * This class uses for reading unsigned integer fields. |
| */ |
| static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> { |
| DictionaryUInt4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| |
| if (usingDictionary) { |
| UInt4Vector.Mutator mutator = valueVec.getMutator(); |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); |
| } |
| // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding |
| // and we will go into the else condition below. The readField method of the parent class requires the |
| // writer index to be set correctly. |
| readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; |
| readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE); |
| int writerIndex = valueVec.getBuffer().writerIndex(); |
| valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength); |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> { |
| DictionaryFixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; |
| readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE); |
| |
| if (usingDictionary) { |
| VarBinaryVector.Mutator mutator = valueVec.getMutator(); |
| Binary currDictValToWrite = null; |
| for (int i = 0; i < recordsReadInThisIteration; i++){ |
| currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); |
| mutator.setSafe(valuesReadInCurrentPass + i, currDictValToWrite.toByteBuffer().slice(), 0, |
| currDictValToWrite.length()); |
| } |
| // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding |
| // and we will go into the else condition below. The readField method of the parent class requires the |
| // writer index to be set correctly. |
| int writerIndex = valueVec.getBuffer().writerIndex(); |
| valueVec.getBuffer().setIndex(0, writerIndex + (int)readLength); |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| |
| // TODO - replace this with fixed binary type in drill |
| // now we need to write the lengths of each value |
| int byteLength = dataTypeLengthInBits / 8; |
| for (int i = 0; i < recordsToReadInThisPass; i++) { |
| valueVec.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength); |
| } |
| } |
| } |
| |
| static class DictionaryTimeReader extends FixedByteAlignedReader<TimeVector> { |
| DictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| for (int i = 0; i < recordsReadInThisIteration; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); |
| } |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| static class DictionaryBigIntReader extends FixedByteAlignedReader<BigIntVector> { |
| DictionaryBigIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| |
| if (usingDictionary) { |
| BigIntVector.Mutator mutator = valueVec.getMutator(); |
| for (int i = 0; i < recordsReadInThisIteration; i++){ |
| mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); |
| } |
| // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding |
| // and we will go into the else condition below. The readField method of the parent class requires the |
| // writer index to be set correctly. |
| readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; |
| readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE); |
| int writerIndex = valueVec.getBuffer().writerIndex(); |
| valueVec.getBuffer().setIndex(0, writerIndex + (int)readLength); |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| /** |
| * This class uses for reading unsigned BigInt fields. |
| */ |
| static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> { |
| DictionaryUInt8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| |
| if (usingDictionary) { |
| UInt8Vector.Mutator mutator = valueVec.getMutator(); |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); |
| } |
| // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding |
| // and we will go into the else condition below. The readField method of the parent class requires the |
| // writer index to be set correctly. |
| readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; |
| readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE); |
| int writerIndex = valueVec.getBuffer().writerIndex(); |
| valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength); |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| static class DictionaryVarDecimalReader extends FixedByteAlignedReader<VarDecimalVector> { |
| |
| DictionaryVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarDecimalVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| recordsReadInThisIteration = |
| Math.min(pageReader.currentPageCount - pageReader.valuesRead, |
| recordsToReadInThisPass - valuesReadInCurrentPass); |
| |
| switch (columnDescriptor.getType()) { |
| case INT32: |
| if (usingDictionary) { |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| byte[] bytes = Ints.toByteArray(pageReader.dictionaryValueReader.readInteger()); |
| setValueBytes(i, bytes); |
| } |
| setWriteIndex(); |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| break; |
| case INT64: |
| if (usingDictionary) { |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| byte[] bytes = Longs.toByteArray(pageReader.dictionaryValueReader.readLong()); |
| setValueBytes(i, bytes); |
| } |
| setWriteIndex(); |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| break; |
| } |
| } |
| |
| /** |
| * Set the write Index. The next page that gets read might be a page that does not use dictionary encoding |
| * and we will go into the else condition below. The readField method of the parent class requires the |
| * writer index to be set correctly. |
| */ |
| private void setWriteIndex() { |
| readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; |
| readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE); |
| int writerIndex = valueVec.getBuffer().writerIndex(); |
| valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength); |
| } |
| |
| private void setValueBytes(int i, byte[] bytes) { |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, bytes, 0, bytes.length); |
| } |
| } |
| |
| static class DictionaryTimeStampReader extends FixedByteAlignedReader<TimeStampVector> { |
| DictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); |
| } |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> { |
| DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes(); |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true)); |
| } |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> { |
| DictionaryFloat4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); |
| } |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| |
| static class DictionaryFloat8Reader extends FixedByteAlignedReader<Float8Vector> { |
| DictionaryFloat8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| recordsReadInThisIteration = Math.min(pageReader.currentPageCount |
| - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); |
| for (int i = 0; i < recordsReadInThisIteration; i++) { |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); |
| } |
| } else { |
| super.readField(recordsToReadInThisPass); |
| } |
| } |
| } |
| } |