blob: 2f18a728ba308ade36b82ce670e25cd745840808 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.Float4Vector;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.TimeStampVector;
import org.apache.drill.exec.vector.TimeVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.UInt8Vector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarDecimalVector;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.io.api.Binary;
import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
public class ParquetFixedWidthDictionaryReaders {
private static final double BITS_COUNT_IN_BYTE_DOUBLE_VALUE = 8.0;
static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> {
DictionaryIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
/**
* This class is used for reading unsigned integer fields.
*/
static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> {
DictionaryUInt4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
UInt4Vector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++) {
mutator.setSafe(valuesReadInCurrentPass + i, valReader.readInteger());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> {
DictionaryFixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
VarBinaryVector.Mutator mutator = valueVec.getMutator();
Binary currDictValToWrite = null;
for (int i = 0; i < recordsReadInThisIteration; i++){
currDictValToWrite = valReader.readBytes();
mutator.setSafe(valuesReadInCurrentPass + i, currDictValToWrite.toByteBuffer().slice(), 0,
currDictValToWrite.length());
}
} else {
super.readField(recordsToReadInThisPass);
}
// TODO - replace this with fixed binary type in drill
// now we need to write the lengths of each value
int byteLength = dataTypeLengthInBits / 8;
for (int i = 0; i < recordsToReadInThisPass; i++) {
valueVec.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength);
}
}
}
static class DictionaryTimeReader extends FixedByteAlignedReader<TimeVector> {
DictionaryTimeReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryTimeMicrosReader extends FixedByteAlignedReader<TimeVector> {
DictionaryTimeMicrosReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
}
} else {
int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
for (int i = 0; i < recordsReadInThisIteration; i++) {
long value = pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes);
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (value / 1000));
}
}
}
}
static class DictionaryBigIntReader extends FixedByteAlignedReader<BigIntVector> {
DictionaryBigIntReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
BigIntVector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++){
mutator.setSafe(valuesReadInCurrentPass + i, valReader.readLong());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
/**
* This class uses for reading unsigned BigInt fields.
*/
static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> {
DictionaryUInt8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
UInt8Vector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++) {
mutator.setSafe(valuesReadInCurrentPass + i, valReader.readLong());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryVarDecimalReader extends FixedByteAlignedReader<VarDecimalVector> {
DictionaryVarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarDecimalVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
recordsReadInThisIteration =
Math.min(pageReader.pageValueCount - pageReader.valuesRead,
recordsToReadInThisPass - valuesReadInCurrentPass);
switch (columnDescriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Ints.toByteArray(valReader.readInteger());
setValueBytes(i, bytes);
}
} else {
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Ints.toByteArray(pageReader.pageData.getInt((int) readStartInBytes + i * dataTypeLengthInBytes));
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, bytes, 0, dataTypeLengthInBytes);
}
}
break;
case INT64:
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Longs.toByteArray(valReader.readLong());
setValueBytes(i, bytes);
}
} else {
for (int i = 0; i < recordsReadInThisIteration; i++) {
byte[] bytes = Longs.toByteArray(pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes));
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, bytes, 0, dataTypeLengthInBytes);
}
}
break;
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
VarDecimalVector.Mutator mutator = valueVec.getMutator();
for (int i = 0; i < recordsReadInThisIteration; i++) {
Binary currDictValToWrite = valReader.readBytes();
mutator.setSafe(valuesReadInCurrentPass + i,
currDictValToWrite.toByteBuffer().slice(), 0, currDictValToWrite.length());
}
} else {
for (int i = 0; i < recordsReadInThisIteration; i++) {
int start = (int) readStartInBytes + i * dataTypeLengthInBytes;
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i,
start, start + dataTypeLengthInBytes, pageReader.pageData);
}
}
}
}
private void setValueBytes(int i, byte[] bytes) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, bytes, 0, bytes.length);
}
}
static class DictionaryTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
DictionaryTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readLong());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryTimeStampMicrosReader extends FixedByteAlignedReader<TimeStampVector> {
DictionaryTimeStampMicrosReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readLong() / 1000);
}
} else {
int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
for (int i = 0; i < recordsReadInThisIteration; i++) {
long value = pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes);
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, value / 1000);
}
}
}
}
static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
Binary binaryTimeStampValue = valReader.readBytes();
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true));
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> {
DictionaryFloat4Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readFloat());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
static class DictionaryFloat8Reader extends FixedByteAlignedReader<Float8Vector> {
DictionaryFloat8Reader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readDouble());
}
} else {
super.readField(recordsToReadInThisPass);
}
}
}
}