blob: f6272cdbb3d9763d4b28f804d311bd7e678eaafe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.vector.DateVector;
import org.apache.drill.exec.vector.IntervalVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.joda.time.DateTimeConstants;
import io.netty.buffer.DrillBuf;
class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
protected DrillBuf bytebuf;
FixedByteAlignedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
recordsReadInThisIteration = Math.min(pageReader.pageValueCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
readStartInBytes = pageReader.readPosInBytes;
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / 8.0);
bytebuf = pageReader.pageData;
// vectorData is assigned by the superclass read loop method
writeData();
}
protected void writeData() {
vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
}
public static class FixedBinaryReader extends FixedByteAlignedReader<VariableWidthVector> {
// TODO - replace this with fixed binary type in drill
VariableWidthVector castedVector;
FixedBinaryReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
VariableWidthVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, true, v, schemaElement);
castedVector = v;
}
@Override
protected void readField(long recordsToReadInThisPass) {
// we can use the standard read method to transfer the data
super.readField(recordsToReadInThisPass);
// TODO - replace this with fixed binary type in drill
// now we need to write the lengths of each value
int byteLength = dataTypeLengthInBits / 8;
for (int i = 0; i < recordsToReadInThisPass; i++) {
castedVector.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength);
}
}
}
public static abstract class ConvertedReader<V extends ValueVector> extends FixedByteAlignedReader<V> {
protected int dataTypeLengthInBytes;
ConvertedReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
public void writeData() {
dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
for (int i = 0; i < recordsReadInThisIteration; i++) {
addNext((int)readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
}
}
/**
* Reads from bytebuf, converts, and writes to buffer
* @param start the index in bytes to start reading from
* @param index the index of the ValueVector
*/
abstract void addNext(int start, int index);
}
public static class DateReader extends ConvertedReader<DateVector> {
private final DateVector.Mutator mutator;
DateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
mutator = v.getMutator();
}
@Override
void addNext(int start, int index) {
int intValue;
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
intValue = valReader.readInteger();
} else {
intValue = readIntLittleEndian(bytebuf, start);
}
mutator.set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
}
}
/**
* Old versions of Drill were writing a non-standard format for date. See DRILL-4203
*/
public static class CorruptDateReader extends ConvertedReader<DateVector> {
private final DateVector.Mutator mutator;
CorruptDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
mutator = v.getMutator();
}
@Override
void addNext(int start, int index) {
int intValue;
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
intValue = valReader.readInteger();
} else {
intValue = readIntLittleEndian(bytebuf, start);
}
mutator.set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
}
}
/**
* Old versions of Drill were writing a non-standard format for date. See DRILL-4203
* <p/>
* For files that lack enough metadata to determine if the dates are corrupt, we must just
* correct values when they look corrupt during this low level read.
*/
public static class CorruptionDetectingDateReader extends ConvertedReader<DateVector> {
private final DateVector.Mutator mutator;
CorruptionDetectingDateReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
mutator = v.getMutator();
}
@Override
void addNext(int start, int index) {
int intValue;
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
intValue = valReader.readInteger();
} else {
intValue = readIntLittleEndian(bytebuf, start);
}
if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
mutator.set(index, (intValue - ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT) * DateTimeConstants.MILLIS_PER_DAY);
} else {
mutator.set(index, intValue * (long) DateTimeConstants.MILLIS_PER_DAY);
}
}
}
public static class IntervalReader extends ConvertedReader<IntervalVector> {
IntervalReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, IntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
void addNext(int start, int index) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
byte[] input = valReader.readBytes().getBytes();
valueVec.getMutator().setSafe(index,
ParquetReaderUtility.getIntFromLEBytes(input, 0),
ParquetReaderUtility.getIntFromLEBytes(input, 4),
ParquetReaderUtility.getIntFromLEBytes(input, 8));
} else {
valueVec.getMutator().setSafe(index, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
}
}
}