blob: 453009b1380d4e5286e1214e6792c1e8636711b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import java.io.IOException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
public class FixedWidthRepeatedReader extends VarLengthColumn<RepeatedValueVector> {
ColumnReader<?> dataReader;
int dataTypeLengthInBytes;
// we can do a vector copy of the data once we figure out how much we need to copy
// this tracks the number of values to transfer (the dataReader will translate this to a number
// of bytes to transfer and re-use the code from the non-repeated types)
int valuesToRead;
int repeatedGroupsReadInCurrentPass;
int repeatedValuesInCurrentList;
// empty lists are notated by definition levels, to stop reading at the correct time, we must keep
// track of the number of empty lists as well as the length of all of the defined lists together
int definitionLevelsRead;
// parquet currently does not restrict lists reaching across pages for repeated values, this necessitates
// tracking when this happens to stop some of the state updates until we know the full length of the repeated
// value for the current record
boolean notFishedReadingList;
byte[] leftOverBytes;
FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader<?> dataReader, int dataTypeLengthInBytes,
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
this.dataTypeLengthInBytes = dataTypeLengthInBytes;
this.dataReader = dataReader;
this.dataReader.pageReader.clear();
this.dataReader.pageReader = this.pageReader;
// this is not in the reset method because it needs to be initialized only for the very first page read
// in all other cases if a read ends at a page boundary we will need to keep track of this flag and not
// clear it at the start of the next read loop
notFishedReadingList = false;
}
@Override
public void reset() {
bytesReadInCurrentPass = 0;
valuesReadInCurrentPass = 0;
pageReader.valuesReadyToRead = 0;
dataReader.vectorData = BaseDataValueVector.class.cast(valueVec.getDataVector()).getBuffer();
dataReader.valuesReadInCurrentPass = 0;
repeatedGroupsReadInCurrentPass = 0;
}
@Override
public int getRecordsReadInCurrentPass() {
return repeatedGroupsReadInCurrentPass;
}
@Override
protected void readField(long recordsToRead) {
//To change body of implemented methods use File | Settings | File Templates.
}
@Override
public boolean skipReadyToReadPositionUpdate() {
return false;
}
@Override
public void updateReadyToReadPosition() {
valuesToRead += repeatedValuesInCurrentList;
pageReader.valuesReadyToRead += repeatedValuesInCurrentList;
repeatedGroupsReadInCurrentPass++;
currDictVal = null;
if ( ! notFishedReadingList) {
repeatedValuesInCurrentList = -1;
}
}
@Override
public void updatePosition() {
pageReader.readPosInBytes += dataTypeLengthInBits;
bytesReadInCurrentPass += dataTypeLengthInBits;
valuesReadInCurrentPass++;
}
@Override
public void hitRowGroupEnd() {
pageReader.valuesReadyToRead = 0;
definitionLevelsRead = 0;
}
@Override
public void postPageRead() {
super.postPageRead();
// this is no longer correct as we figured out that lists can reach across pages
if ( ! notFishedReadingList) {
repeatedValuesInCurrentList = -1;
}
definitionLevelsRead = 0;
}
@Override
protected int totalValuesReadAndReadyToReadInPage() {
// we need to prevent the page reader from getting rid of the current page in the case where we have a repeated
// value split across a page boundary
if (notFishedReadingList) {
return definitionLevelsRead - repeatedValuesInCurrentList;
}
return definitionLevelsRead;
}
@Override
protected boolean checkVectorCapacityReached() {
boolean doneReading = super.checkVectorCapacityReached();
if (doneReading) {
return true;
}
if (valuesReadInCurrentPass + pageReader.valuesReadyToRead + repeatedValuesInCurrentList >= valueVec.getValueCapacity()) {
return true;
} else {
return false;
}
}
@Override
protected boolean readAndStoreValueSizeInformation() {
int numLeftoverVals = 0;
if (notFishedReadingList) {
numLeftoverVals = repeatedValuesInCurrentList;
readRecords(numLeftoverVals);
notFishedReadingList = false;
pageReader.valuesReadyToRead = 0;
try {
boolean stopReading = readPage();
if (stopReading) {
// hit the end of a row group
return false;
}
} catch (IOException e) {
throw new RuntimeException("Unexpected error reading parquet repeated column.", e);
}
}
if ( currDefLevel == -1 ) {
currDefLevel = pageReader.definitionLevels.nextInt();
definitionLevelsRead++;
}
int repLevel;
if ( columnDescriptor.getMaxDefinitionLevel() == currDefLevel) {
if (repeatedValuesInCurrentList == -1 || notFishedReadingList) {
repeatedValuesInCurrentList = 1;
do {
repLevel = pageReader.repetitionLevels.nextInt();
if (repLevel > 0) {
repeatedValuesInCurrentList++;
currDefLevel = pageReader.definitionLevels.nextInt();
definitionLevelsRead++;
// we hit the end of this page, without confirmation that we reached the end of the current record
if (definitionLevelsRead == pageReader.pageValueCount) {
// check that we have not hit the end of the row group (in which case we will not find the repetition level indicating
// the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay
// to add it to the read )
if (totalValuesRead + pageReader.valuesReadyToRead + repeatedValuesInCurrentList != columnChunkMetaData.getValueCount()) {
notFishedReadingList = true;
// if we hit this case, we cut off the current batch at the previous value, these extra values as well
// as those that spill into the next page will be added to the next batch
return true;
}
}
}
} while (repLevel != 0);
}
} else {
repeatedValuesInCurrentList = 0;
}
// this should not fail
final UInt4Vector offsets = valueVec.getOffsetVector();
offsets.getMutator().setSafe(repeatedGroupsReadInCurrentPass + 1, offsets.getAccessor().get(repeatedGroupsReadInCurrentPass));
// This field is being referenced in the superclass determineSize method, so we need to set it here
// again going to make this the length in BYTES to avoid repetitive multiplication/division
dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes;
return false;
}
@Override
protected void readRecords(int valuesToRead) {
if (valuesToRead == 0) {
return;
}
// TODO - validate that this works in all cases, it fixes a bug when reading from multiple pages into
// a single vector
dataReader.valuesReadInCurrentPass = 0;
dataReader.readValues(valuesToRead);
valuesReadInCurrentPass += valuesToRead;
valueVec.getMutator().setValueCount(repeatedGroupsReadInCurrentPass);
valueVec.getDataVector().getMutator().setValueCount(valuesReadInCurrentPass);
}
@Override
public int capacity() {
return BaseDataValueVector.class.cast(valueVec.getDataVector()).getBuffer().capacity();
}
@Override
public void clear() {
super.clear();
dataReader.clear();
}
}