blob: 416d06c3aac679ed3d1d32ceb95a4413b1979bf8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
public abstract class NullableVarLengthValuesColumn<V extends ValueVector> extends VarLengthValuesColumn<V> {
int nullsRead;
boolean currentValNull = false;
NullableVarLengthValuesColumn(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
@Override
public abstract boolean setSafe(int index, DrillBuf value, int start, int length);
@Override
public abstract int capacity();
@Override
public void reset() {
bytesReadInCurrentPass = 0;
valuesReadInCurrentPass = 0;
nullsRead = 0;
pageReader.valuesReadyToRead = 0;
}
@Override
protected void postPageRead() {
currLengthDeterminingDictVal = null;
pageReader.valuesReadyToRead = 0;
}
@Override
protected boolean readAndStoreValueSizeInformation() {
// we need to read all of the lengths to determine if this value will fit in the current vector,
// as we can only read each definition level once, we have to store the last one as we will need it
// at the start of the next read if we decide after reading all of the varlength values in this record
// that it will not fit in this batch
currentValNull = false;
if ( currDefLevel == -1 ) {
currDefLevel = pageReader.definitionLevels.nextInt();
}
if ( columnDescriptor.getMaxDefinitionLevel() > currDefLevel) {
nullsRead++;
// set length of zero, each index in the vector defaults to null so no need to set the nullability
variableWidthVector.getMutator().setValueLengthSafe(
valuesReadInCurrentPass + pageReader.valuesReadyToRead, 0);
currentValNull = true;
return false;// field is null, no length to add to data vector
}
if (usingDictionary) {
if (currLengthDeterminingDictVal == null) {
currLengthDeterminingDictVal = pageReader.getDictionaryLengthDeterminingReader().readBytes();
}
currDecodedValToWrite = currLengthDeterminingDictVal;
// re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
dataTypeLengthInBits = currLengthDeterminingDictVal.length();
}
else {
// re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
dataTypeLengthInBits = pageReader.pageData.getInt((int) pageReader.readyToReadPosInBytes);
}
// I think this also needs to happen if it is null for the random access
return ! setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageData,
(int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
}
@Override
public void updateReadyToReadPosition() {
if (! currentValNull) {
pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4;
}
pageReader.valuesReadyToRead++;
currLengthDeterminingDictVal = null;
}
@Override
public void updatePosition() {
if (! currentValNull) {
pageReader.readPosInBytes += dataTypeLengthInBits + 4;
bytesReadInCurrentPass += dataTypeLengthInBits;
}
currentValNull = false;
valuesReadInCurrentPass++;
}
@Override
protected void readField(long recordsToRead) {
// TODO - unlike most implementations of this method, the recordsReadInThisIteration field is not set here
// should verify that this is not breaking anything
currentValNull = variableWidthVector.getAccessor().isNull(valuesReadInCurrentPass);
// again, I am re-purposing the unused field here, it is a length n BYTES, not bits
if (! currentValNull) {
if (usingDictionary) {
currDecodedValToWrite = pageReader.getDictionaryValueReader().readBytes();
}
// re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageData,
(int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
assert success;
}
updatePosition();
}
}