| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.parquet.columnreaders; |
| |
| import java.math.BigDecimal; |
| import java.nio.ByteBuffer; |
| |
| import org.apache.drill.common.exceptions.ExecutionSetupException; |
| import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder; |
| import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder; |
| import org.apache.drill.exec.store.parquet.ParquetReaderUtility; |
| import org.apache.drill.exec.util.DecimalUtility; |
| import org.apache.drill.exec.vector.NullableBigIntVector; |
| import org.apache.drill.exec.vector.NullableDateVector; |
| import org.apache.drill.exec.vector.NullableDecimal18Vector; |
| import org.apache.drill.exec.vector.NullableDecimal28SparseVector; |
| import org.apache.drill.exec.vector.NullableDecimal38SparseVector; |
| import org.apache.drill.exec.vector.NullableDecimal9Vector; |
| import org.apache.drill.exec.vector.NullableFloat4Vector; |
| import org.apache.drill.exec.vector.NullableFloat8Vector; |
| import org.apache.drill.exec.vector.NullableIntVector; |
| import org.apache.drill.exec.vector.NullableIntervalVector; |
| import org.apache.drill.exec.vector.NullableTimeStampVector; |
| import org.apache.drill.exec.vector.NullableTimeVector; |
| import org.apache.drill.exec.vector.NullableVarBinaryVector; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.parquet.column.ColumnDescriptor; |
| import org.apache.parquet.format.SchemaElement; |
| import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; |
| import org.apache.parquet.io.api.Binary; |
| import org.joda.time.DateTimeConstants; |
| |
| import io.netty.buffer.DrillBuf; |
| |
| public class NullableFixedByteAlignedReaders { |
| |
| static class NullableFixedByteAlignedReader<V extends ValueVector> extends NullableColumnReader<V> { |
| protected DrillBuf bytebuf; |
| |
| NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| this.bytebuf = pageReader.pageData; |
| |
| // fill in data. |
| vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength); |
| } |
| } |
| |
| /** |
| * Class for reading the fixed length byte array type in parquet. Currently Drill does not have |
| * a fixed length binary type, so this is read into a varbinary with the same size recorded for |
| * each value. |
| */ |
| static class NullableFixedBinaryReader extends NullableFixedByteAlignedReader<NullableVarBinaryVector> { |
| NullableFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| this.bytebuf = pageReader.pageData; |
| if (usingDictionary) { |
| NullableVarBinaryVector.Mutator mutator = valueVec.getMutator(); |
| Binary currDictValToWrite; |
| for (int i = 0; i < recordsReadInThisIteration; i++){ |
| currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); |
| ByteBuffer buf = currDictValToWrite.toByteBuffer(); |
| mutator.setSafe(valuesReadInCurrentPass + i, buf, buf.position(), |
| currDictValToWrite.length()); |
| } |
| // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding |
| // and we will go into the else condition below. The readField method of the parent class requires the |
| // writer index to be set correctly. |
| int writerIndex = castedBaseVector.getBuffer().writerIndex(); |
| castedBaseVector.getBuffer().setIndex(0, writerIndex + (int)readLength); |
| } else { |
| super.readField(recordsToReadInThisPass); |
| // TODO - replace this with fixed binary type in drill |
| // for now we need to write the lengths of each value |
| int byteLength = dataTypeLengthInBits / 8; |
| for (int i = 0; i < recordsToReadInThisPass; i++) { |
| valueVec.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength); |
| } |
| } |
| } |
| } |
| |
| static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> { |
| |
| NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); |
| } |
| int writerIndex = castedBaseVector.getBuffer().writerIndex(); |
| castedBaseVector.getBuffer().setIndex(0, writerIndex + (int)readLength); |
| } else { |
| |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); |
| } |
| } |
| } |
| } |
| |
| static class NullableDictionaryDecimal9Reader extends NullableColumnReader<NullableDecimal9Vector> { |
| |
| NullableDictionaryDecimal9Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal9Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); |
| } |
| } else { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); |
| } |
| } |
| } |
| } |
| |
| static class NullableDictionaryTimeReader extends NullableColumnReader<NullableTimeVector> { |
| |
| NullableDictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); |
| } |
| } else { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); |
| } |
| } |
| } |
| } |
| |
| static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> { |
| |
| NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); |
| } |
| } else { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong()); |
| } |
| } |
| } |
| } |
| |
| static class NullableDictionaryTimeStampReader extends NullableColumnReader<NullableTimeStampVector> { |
| |
| NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableTimeStampVector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); |
| } |
| } else { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong()); |
| } |
| } |
| } |
| } |
| static class NullableDictionaryDecimal18Reader extends NullableColumnReader<NullableDecimal18Vector> { |
| |
| NullableDictionaryDecimal18Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableDecimal18Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); |
| } |
| } else { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong()); |
| } |
| } |
| } |
| } |
| static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> { |
| |
| NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); |
| } |
| } else { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readFloat()); |
| } |
| } |
| } |
| } |
| |
| static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> { |
| |
| NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v, |
| SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| // this method is called by its superclass during a read loop |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| if (usingDictionary) { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); |
| } |
| } else { |
| for (int i = 0; i < recordsToReadInThisPass; i++){ |
| valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readDouble()); |
| } |
| } |
| } |
| } |
| |
| static abstract class NullableConvertedReader<V extends ValueVector> extends NullableFixedByteAlignedReader<V> { |
| |
| protected int dataTypeLengthInBytes; |
| |
| NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, |
| ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| @Override |
| protected void readField(long recordsToReadInThisPass) { |
| |
| this.bytebuf = pageReader.pageData; |
| |
| dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0); |
| for (int i = 0; i < recordsToReadInThisPass; i++) { |
| addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass); |
| } |
| } |
| |
| abstract void addNext(int start, int index); |
| } |
| |
| public static class NullableDateReader extends NullableConvertedReader<NullableDateVector> { |
| NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, |
| boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| @Override |
| void addNext(int start, int index) { |
| int intValue; |
| if (usingDictionary) { |
| intValue = pageReader.dictionaryValueReader.readInteger(); |
| } else { |
| intValue = readIntLittleEndian(bytebuf, start); |
| } |
| |
| valueVec.getMutator().set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY); |
| } |
| |
| } |
| |
| /** |
| * Old versions of Drill were writing a non-standard format for date. See DRILL-4203 |
| */ |
| public static class NullableCorruptDateReader extends NullableConvertedReader<NullableDateVector> { |
| |
| NullableCorruptDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, |
| boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| @Override |
| void addNext(int start, int index) { |
| int intValue; |
| if (usingDictionary) { |
| intValue = pageReader.dictionaryValueReader.readInteger(); |
| } else { |
| intValue = readIntLittleEndian(bytebuf, start); |
| } |
| |
| valueVec.getMutator().set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY); |
| } |
| |
| } |
| |
| /** |
| * Old versions of Drill were writing a non-standard format for date. See DRILL-4203 |
| * |
| * For files that lack enough metadata to determine if the dates are corrupt, we must just |
| * correct values when they look corrupt during this low level read. |
| */ |
| public static class CorruptionDetectingNullableDateReader extends NullableConvertedReader<NullableDateVector> { |
| |
| NullableDateVector dateVector; |
| |
| CorruptionDetectingNullableDateReader(ParquetRecordReader parentReader, int allocateSize, |
| ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, |
| boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) |
| throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| dateVector = (NullableDateVector) v; |
| } |
| |
| @Override |
| void addNext(int start, int index) { |
| int intValue; |
| if (usingDictionary) { |
| intValue = pageReader.dictionaryValueReader.readInteger(); |
| } else { |
| intValue = readIntLittleEndian(bytebuf, start); |
| } |
| |
| if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) { |
| dateVector.getMutator().set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY); |
| } else { |
| dateVector.getMutator().set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY); |
| } |
| } |
| } |
| |
| public static class NullableDecimal28Reader extends NullableConvertedReader<NullableDecimal28SparseVector> { |
| |
| NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, |
| boolean fixedLength, NullableDecimal28SparseVector v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| @Override |
| void addNext(int start, int index) { |
| int width = NullableDecimal28SparseHolder.WIDTH; |
| BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, |
| schemaElement.getScale()); |
| DecimalUtility.getSparseFromBigDecimal(intermediate, valueVec.getBuffer(), index * width, schemaElement.getScale(), |
| schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits); |
| } |
| } |
| |
| public static class NullableDecimal38Reader extends NullableConvertedReader<NullableDecimal38SparseVector> { |
| NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, |
| boolean fixedLength, NullableDecimal38SparseVector v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| @Override |
| void addNext(int start, int index) { |
| int width = NullableDecimal38SparseHolder.WIDTH; |
| BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, |
| schemaElement.getScale()); |
| DecimalUtility.getSparseFromBigDecimal(intermediate, valueVec.getBuffer(), index * width, schemaElement.getScale(), |
| schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits); |
| } |
| } |
| |
| public static class NullableIntervalReader extends NullableConvertedReader<NullableIntervalVector> { |
| NullableIntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, |
| boolean fixedLength, NullableIntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException { |
| super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); |
| } |
| |
| @Override |
| void addNext(int start, int index) { |
| if (usingDictionary) { |
| byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes(); |
| valueVec.getMutator().setSafe(index * 12, 1, |
| ParquetReaderUtility.getIntFromLEBytes(input, 0), |
| ParquetReaderUtility.getIntFromLEBytes(input, 4), |
| ParquetReaderUtility.getIntFromLEBytes(input, 8)); |
| } |
| valueVec.getMutator().set(index, 1, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8)); |
| } |
| } |
| } |
| |