blob: 9e019ee1945907c04c3f082b67337fcca4d8acdd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.Float4Vector;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.TimeStampVector;
import org.apache.drill.exec.vector.TimeVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.UInt8Vector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarDecimalVector;
import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.io.api.Binary;
import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
public class ParquetFixedWidthDictionaryReaders {
private static final double BITS_COUNT_IN_BYTE_DOUBLE_VALUE = 8.0;
static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> {
DictionaryIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (usingDictionary) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
/**
* This class uses for reading unsigned integer fields.
*/
static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> {
DictionaryUInt4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (usingDictionary) {
UInt4Vector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++) {
mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
}
// Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
// and we will go into the else condition below. The readField method of the parent class requires the
// writer index to be set correctly.
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE);
int writerIndex = valueVec.getBuffer().writerIndex();
valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> {
DictionaryFixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE);
if (usingDictionary) {
VarBinaryVector.Mutator mutator = valueVec.getMutator();
Binary currDictValToWrite = null;
for (int i = 0; i < recordsReadInThisIteration; i++){
currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
mutator.setSafe(valuesReadInCurrentPass + i, currDictValToWrite.toByteBuffer().slice(), 0,
currDictValToWrite.length());
}
// Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
// and we will go into the else condition below. The readField method of the parent class requires the
// writer index to be set correctly.
int writerIndex = valueVec.getBuffer().writerIndex();
valueVec.getBuffer().setIndex(0, writerIndex + (int)readLength);
} else {
super.readField(recordsToReadInThisPass);
}
// TODO - replace this with fixed binary type in drill
// now we need to write the lengths of each value
int byteLength = dataTypeLengthInBits / 8;
for (int i = 0; i < recordsToReadInThisPass; i++) {
valueVec.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength);
}
}
}
static class DictionaryTimeReader extends FixedByteAlignedReader<TimeVector> {
DictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (usingDictionary) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryBigIntReader extends FixedByteAlignedReader<BigIntVector> {
DictionaryBigIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (usingDictionary) {
BigIntVector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++){
mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
}
// Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
// and we will go into the else condition below. The readField method of the parent class requires the
// writer index to be set correctly.
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE);
int writerIndex = valueVec.getBuffer().writerIndex();
valueVec.getBuffer().setIndex(0, writerIndex + (int)readLength);
} else {
super.readField(recordsToReadInThisPass);
}
}
}
/**
* This class uses for reading unsigned BigInt fields.
*/
static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> {
DictionaryUInt8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (usingDictionary) {
UInt8Vector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++) {
mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
}
// Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
// and we will go into the else condition below. The readField method of the parent class requires the
// writer index to be set correctly.
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE);
int writerIndex = valueVec.getBuffer().writerIndex();
valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryVarDecimalReader extends FixedByteAlignedReader<VarDecimalVector> {
DictionaryVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarDecimalVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration =
Math.min(pageReader.currentPageCount - pageReader.valuesRead,
recordsToReadInThisPass - valuesReadInCurrentPass);
switch (columnDescriptor.getType()) {
case INT32:
if (usingDictionary) {
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Ints.toByteArray(pageReader.dictionaryValueReader.readInteger());
setValueBytes(i, bytes);
}
setWriteIndex();
} else {
super.readField(recordsToReadInThisPass);
}
break;
case INT64:
if (usingDictionary) {
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Longs.toByteArray(pageReader.dictionaryValueReader.readLong());
setValueBytes(i, bytes);
}
setWriteIndex();
} else {
super.readField(recordsToReadInThisPass);
}
break;
}
}
/**
* Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
* and we will go into the else condition below. The readField method of the parent class requires the
* writer index to be set correctly.
*/
private void setWriteIndex() {
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / BITS_COUNT_IN_BYTE_DOUBLE_VALUE);
int writerIndex = valueVec.getBuffer().writerIndex();
valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
}
private void setValueBytes(int i, byte[] bytes) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, bytes, 0, bytes.length);
}
}
static class DictionaryTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
DictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (usingDictionary) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (usingDictionary) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes();
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true));
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> {
DictionaryFloat4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (usingDictionary) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryFloat8Reader extends FixedByteAlignedReader<Float8Vector> {
DictionaryFloat8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (usingDictionary) {
recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
}