blob: c16b82a7aa109debc8eb5e73db4a67a2b37f0a06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import java.io.IOException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DefLevelReaderWrapper;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableColumnReader.class);
protected BaseDataValueVector castedBaseVector;
protected NullableVectorDefinitionSetter castedVectorMutator;
/** The number of values we have processed thus far */
private int currPageValuesProcessed = 0;
/** Definition level wrapper to handle {@link ValueVector} limitations */
private final DefLevelReaderWrapper definitionLevelWrapper = new DefLevelReaderWrapper();
NullableColumnReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
castedBaseVector = (BaseDataValueVector) v;
castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
}
/** {@inheritDoc} */
@Override
public void processPages(long recordsToReadInThisPass) throws IOException {
if (!parentReader.useBulkReader()) {
processPagesOrig(recordsToReadInThisPass);
} else {
processPagesBulk(recordsToReadInThisPass);
}
}
private final void processPagesOrig(long recordsToReadInThisPass) throws IOException {
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
vectorData = castedBaseVector.getBuffer();
// values need to be spaced out where nulls appear in the column
// leaving blank space for nulls allows for random access to values
// to optimize copying data out of the buffered disk stream, runs of defined values
// are located and copied together, rather than copying individual values
int runLength = -1; // number of non-null records in this pass.
int nullRunLength = -1; // number of consecutive null records that we read.
int currentDefinitionLevel = -1;
int readCount = 0; // the record number we last read.
int writeCount = 0; // the record number we last wrote to the value vector.
// This was previously the indexInOutputVector variable
boolean haveMoreData; // true if we have more data and have not filled the vector
while (readCount < recordsToReadInThisPass && writeCount < valueVec.getValueCapacity()) {
// read a page if needed
if (!pageReader.hasPage()
|| (currPageValuesProcessed >= pageReader.pageValueCount)) {
if (!pageReader.next()) {
break;
}
//New page. Reset the definition level.
currentDefinitionLevel = -1;
currPageValuesProcessed = 0;
recordsReadInThisIteration = 0;
readStartInBytes = 0;
}
nullRunLength = 0;
runLength = 0;
//
// Let's skip the next run of nulls if any ...
//
// If we are reentering this loop, the currentDefinitionLevel has already been read
if (currentDefinitionLevel < 0) {
currentDefinitionLevel = pageReader.definitionLevels.nextInt();
}
haveMoreData = readCount < recordsToReadInThisPass
&& writeCount + nullRunLength < valueVec.getValueCapacity()
&& currPageValuesProcessed < pageReader.pageValueCount;
while (haveMoreData && currentDefinitionLevel < columnDescriptor
.getMaxDefinitionLevel()) {
readCount++;
nullRunLength++;
currPageValuesProcessed++;
haveMoreData = readCount < recordsToReadInThisPass
&& writeCount + nullRunLength < valueVec.getValueCapacity()
&& currPageValuesProcessed < pageReader.pageValueCount;
if (haveMoreData) {
currentDefinitionLevel = pageReader.definitionLevels.nextInt();
}
}
//
// Write the nulls if any
//
if (nullRunLength > 0) {
int writerIndex =
((BaseDataValueVector) valueVec).getBuffer().writerIndex();
castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) Math
.ceil(nullRunLength * dataTypeLengthInBits / 8.0));
writeCount += nullRunLength;
valuesReadInCurrentPass += nullRunLength;
recordsReadInThisIteration += nullRunLength;
}
//
// Handle the run of non-null values
//
haveMoreData = readCount < recordsToReadInThisPass
&& writeCount + runLength < valueVec.getValueCapacity()
// note: writeCount+runLength
&& currPageValuesProcessed < pageReader.pageValueCount;
while (haveMoreData && currentDefinitionLevel >= columnDescriptor
.getMaxDefinitionLevel()) {
readCount++;
runLength++;
currPageValuesProcessed++;
castedVectorMutator.setIndexDefined(writeCount + runLength
- 1); //set the nullable bit to indicate a non-null value
haveMoreData = readCount < recordsToReadInThisPass
&& writeCount + runLength < valueVec.getValueCapacity()
&& currPageValuesProcessed < pageReader.pageValueCount;
if (haveMoreData) {
currentDefinitionLevel = pageReader.definitionLevels.nextInt();
}
}
//
// Write the non-null values
//
if (runLength > 0) {
// set up metadata
// This _must_ be set so that the call to readField works correctly for all datatypes
this.recordsReadInThisIteration += runLength;
this.readStartInBytes = pageReader.readPosInBytes;
this.readLengthInBits = (long) runLength * dataTypeLengthInBits;
this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
readField(runLength);
writeCount += runLength;
valuesReadInCurrentPass += runLength;
pageReader.readPosInBytes = readStartInBytes + readLength;
}
pageReader.valuesRead += Math.toIntExact(recordsReadInThisIteration);
totalValuesRead += runLength + nullRunLength;
logger.trace("" + "recordsToReadInThisPass: {} \t "
+ "Run Length: {} \t Null Run Length: {} \t readCount: {} \t writeCount: {} \t "
+ "recordsReadInThisIteration: {} \t valuesReadInCurrentPass: {} \t "
+ "totalValuesRead: {} \t readStartInBytes: {} \t readLength: {} \t pageReader.byteLength: {} \t "
+ "currPageValuesProcessed: {} \t pageReader.currentPageCount: {}",
recordsToReadInThisPass, runLength, nullRunLength, readCount,
writeCount, recordsReadInThisIteration, valuesReadInCurrentPass,
totalValuesRead, readStartInBytes, readLength, pageReader.byteLength,
currPageValuesProcessed, pageReader.pageValueCount);
}
valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
}
private final void processPagesBulk(long recordsToReadInThisPass) throws IOException {
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
vectorData = castedBaseVector.getBuffer();
// values need to be spaced out where nulls appear in the column
// leaving blank space for nulls allows for random access to values
// to optimize copying data out of the buffered disk stream, runs of defined values
// are located and copied together, rather than copying individual values
int valueCount = 0;
final int maxValuesToProcess = Math.min((int) recordsToReadInThisPass, valueVec.getValueCapacity());
// To handle the case where the page has been already loaded
if (pageReader.definitionLevels != null && currPageValuesProcessed == 0) {
definitionLevelWrapper.set(pageReader.definitionLevels, pageReader.pageValueCount);
}
while (valueCount < maxValuesToProcess) {
// read a page if needed
if (!pageReader.hasPage() || (currPageValuesProcessed == pageReader.pageValueCount)) {
if (!pageReader.next()) {
break;
}
//New page. Reset the definition level.
currPageValuesProcessed = 0;
recordsReadInThisIteration = 0;
readStartInBytes = 0;
// Update the Definition Level reader
definitionLevelWrapper.set(pageReader.definitionLevels, pageReader.pageValueCount);
}
definitionLevelWrapper.readFirstIntegerIfNeeded();
int numNullValues = 0;
int numNonNullValues = 0;
final int remaining = maxValuesToProcess - valueCount;
int currBatchSz = Math.min(remaining, (pageReader.pageValueCount - currPageValuesProcessed));
assert currBatchSz > 0;
// Let's skip the next run of nulls if any ...
int idx;
for (idx = 0; idx < currBatchSz; ++idx) {
if (definitionLevelWrapper.readCurrInteger() == 1) {
break; // non-value encountered
}
definitionLevelWrapper.nextIntegerIfNotEOF();
}
numNullValues += idx;
// Write the nulls if any
if (numNullValues > 0) {
int writerIndex = ((BaseDataValueVector) valueVec).getBuffer().writerIndex();
castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) Math.ceil(numNullValues * dataTypeLengthInBits / 8.0));
// let's update the counters
currBatchSz -= numNullValues;
valuesReadInCurrentPass += numNullValues;
recordsReadInThisIteration += numNullValues;
}
// Let's figure out the number of contiguous non-null values
for (idx = 0; idx < currBatchSz; ++idx) {
if (definitionLevelWrapper.readCurrInteger() == 0) {
break;
}
definitionLevelWrapper.nextIntegerIfNotEOF();
}
numNonNullValues += idx;
//
// Write the non-null values
//
if (numNonNullValues > 0) {
// Set the non-values nullable state
castedVectorMutator.setIndexDefined(valueCount + numNullValues, numNonNullValues);
// This _must_ be set so that the call to readField works correctly for all datatypes
this.recordsReadInThisIteration += numNonNullValues;
this.readStartInBytes = pageReader.readPosInBytes;
this.readLengthInBits = (long) numNonNullValues * dataTypeLengthInBits;
this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
readField(numNonNullValues);
valuesReadInCurrentPass += numNonNullValues;
pageReader.readPosInBytes = readStartInBytes + readLength;
}
pageReader.valuesRead += Math.toIntExact(recordsReadInThisIteration);
totalValuesRead += numNonNullValues + numNullValues;
currPageValuesProcessed += numNonNullValues + numNullValues;
valueCount += numNonNullValues + numNullValues;
if (logger.isTraceEnabled()) {
logger.trace("" + "recordsToReadInThisPass: {} \t "
+ "Run Length: {} \t Null Run Length: {} \t valueCount: {} \t "
+ "recordsReadInThisIteration: {} \t valuesReadInCurrentPass: {} \t "
+ "totalValuesRead: {} \t readStartInBytes: {} \t readLength: {} \t pageReader.byteLength: {} \t "
+ "currPageValuesProcessed: {} \t pageReader.currentPageCount: {}",
recordsToReadInThisPass, numNonNullValues, numNullValues, valueCount,
recordsReadInThisIteration, valuesReadInCurrentPass,
totalValuesRead, readStartInBytes, readLength, pageReader.byteLength,
currPageValuesProcessed, pageReader.pageValueCount);
}
} // loop-end
valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
}
@Override
protected abstract void readField(long recordsToRead);
}