blob: 1cf9357ffedacd3ded2d31e71db041f23a7b7b1c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import io.netty.buffer.DrillBuf;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarLenBulkEntry;
import org.apache.drill.exec.vector.VarLenBulkInput;
import org.apache.parquet.column.values.ValuesReader;
/** Implements the {@link VarLenBulkInput} interface to optimize data copy */
public final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkInput<VarLenBulkEntry> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenColumnBulkInput.class);
/** A cut off number for bulk processing */
private static final int BULK_PROCESSING_MAX_PREC_LEN = 1 << 10;
/** parent object */
private final VarLengthValuesColumn<V> parentInst;
/** Batch sizer manager */
private final RecordBatchSizerManager batchSizerMgr;
/** Column precision type information (owner by caller) */
private final ColumnPrecisionInfo columnPrecInfo;
/** Custom definition level reader */
private final DefLevelReaderWrapper custDefLevelReader;
/** Custom encoded values reader */
private final ValuesReaderWrapper custValuesReader;
/** The records to read */
private final int recordsToRead;
/** Maximum Memory (in bytes) to use for loading this field's data
* (soft limit as a single row can go beyond this value)
private ColumnMemoryQuota columnMemoryQuota;
/** Current operation bulk reader state */
private final OprBulkReadState oprReadState;
/** Container class for holding page data information */
private final PageDataInfo pageInfo = new PageDataInfo();
/** Buffered page payload */
private VarLenBulkPageReader buffPagePayload;
/** A callback to allow child readers interact with this class */
private final VarLenColumnBulkInputCallback callback;
/** Column memory usage information */
private final ColumnMemoryUsageInfo columnMemoryUsage = new ColumnMemoryUsageInfo();
/** A reference to column's overflow data (could be null) */
private FieldOverflowStateContainer fieldOverflowStateContainer;
* @param parentInst parent object instance
* @param recordsToRead number of records to read
* @param bulkReaderState bulk reader state
* @throws IOException runtime exception in case of processing error
VarLenColumnBulkInput(VarLengthValuesColumn<V> parentInst,
int recordsToRead, BulkReaderState bulkReaderState) throws IOException {
this.parentInst = parentInst;
this.batchSizerMgr = this.parentInst.parentReader.getBatchSizesMgr();
this.recordsToRead = recordsToRead;
this.callback = new VarLenColumnBulkInputCallback(this);
this.columnPrecInfo = bulkReaderState.columnPrecInfo;
this.custDefLevelReader = bulkReaderState.definitionLevelReader;
this.custValuesReader = bulkReaderState.encodedValuesReader;
this.fieldOverflowStateContainer = this.batchSizerMgr.getFieldOverflowContainer(parentInst.valueVec.getField().getName());
// Load page if none have been read
// Create the internal READ_STATE object based on the current page-reader state
this.oprReadState = new OprBulkReadState(parentInst.pageReader.readyToReadPosInBytes, parentInst.pageReader.valuesRead);
// Let's try to figure out whether this columns is fixed or variable length; this information
// is not always accurate within the Parquet schema metadata.
if (ColumnPrecisionType.isPrecTypeUnknown(columnPrecInfo.columnPrecisionType)) {
// Initialize the buffered-page-payload object
/** {@inheritDoc} */
public boolean hasNext() {
try {
if (!batchConstraintsReached()) {
// If there is overflow data, then proceed; otherwise, make sure there is still Parquet data to be
// read.
if (!overflowDataAvailable()) {
// We need to ensure there is a page of data to be read
if (!parentInst.pageReader.hasPage() || parentInst.pageReader.pageValueCount == oprReadState.numPageFieldsProcessed) {
long totalValueCount = parentInst.columnChunkMetaData.getValueCount();
if (totalValueCount == (parentInst.totalValuesRead + oprReadState.batchNumValuesReadFromPages)
|| ! {
return false;
// Reset the state object page read metadata
oprReadState.numPageFieldsProcessed = 0;
oprReadState.pageReadPos = parentInst.pageReader.readyToReadPosInBytes;
// Update the value readers information
// Update the buffered-page-payload since we've read a new page
// Alright, we didn't hit a batch constraint and are able to read either from the overflow data or
// the Parquet data.
return true;
} else {
return false;
} catch (IOException ie) {
throw new DrillRuntimeException(ie);
/** {@inheritDoc} */
public final VarLenBulkEntry next() {
final int remaining = getRemainingRecords();
final VarLenColumnBulkEntry result = buffPagePayload.getEntry(remaining);
// Update position for next read
if (result != null && result.getNumValues() > 0) {
// We need to update page stats only when we are reading directly from Parquet pages; there are
// situations where we have to return the overflow data (read in a previous batch)
if (result.isReadFromPage()) {
// Page read position is meaningful only when dictionary mode is off
if (!pageInfo.encodedValueReader.isDefined()) {
oprReadState.pageReadPos += (result.getTotalLength() + 4 * result.getNumNonNullValues());
oprReadState.numPageFieldsProcessed += result.getNumValues();
oprReadState.batchNumValuesReadFromPages += result.getNumValues();
// Update the batch field index
oprReadState.batchFieldIndex += result.getNumValues();
return result;
/** {@inheritDoc} */
public final void remove() {
throw new UnsupportedOperationException();
/** {@inheritDoc} */
public final int getStartIndex() {
return oprReadState.batchFieldIndex;
/** {@inheritDoc} */
public final void done() {
// Update the page reader state so that a future call to this method resumes
// where we left off.
// Page read position is meaningful only when dictionary mode is off
if (pageInfo.encodedValueReader == null
|| !pageInfo.encodedValueReader.isDefined()) {
parentInst.pageReader.readyToReadPosInBytes = oprReadState.pageReadPos;
parentInst.pageReader.valuesRead = oprReadState.numPageFieldsProcessed;
parentInst.totalValuesRead += oprReadState.batchNumValuesReadFromPages;
if (logger.isDebugEnabled()) {
String message = String.format("requested=%d, returned=%d, total-returned=%d",
* @return minimum memory size required to process a variable column in a columnar manner
public static int getMinVLColumnMemorySize() {
// How did we come up with this number?
// Let's first lay down some facts
// a) the allocator-rounding-to-next-power-of-two has to be accounted for
// b) VL columns use up to three vectors "bits", "offsets", and "values"
// c) The maximum number of entries per chunk is chunk-size / 4
// d) "data" and "length" sizes within a chunk have a reverse relationship (if one grows, then the other shrinks)
// e) "data" and "length" within a chunk cannot exceed 1 chunk-size each when loaded into the ValueVector
// This information gives the following upper bound
// - max-chunk-vv-footprint < (chunk-size/4 + 1chunk-size + 1/2 chunk-size) < 2 chunk-sizes
// Why?
// - max-bits footprint is controlled by c)
// - "data" and "offsets" can have a max footprint of 1 chunk size when they are over chunk-size/2 since
// roundup happens; if one goes beyond chunk-size/2, then the other is less than that (inverse relationship)
// which leads to a maximum memory footprint of 1 chunk-size + 1/2 chunk-size
return VarLenBulkPageReader.BUFF_SZ * 2;
final int getReadBatchFields() {
return oprReadState.batchFieldIndex;
private final void setValuesReadersOnNewPage() {
PageReader pageReader = parentInst.pageReader;
if (pageReader.pageValueCount > 0) {
custDefLevelReader.set(pageReader.definitionLevels, pageReader.pageValueCount);
if (parentInst.recordsRequireDecoding()) {
? pageReader.getDictionaryValueReader()
: pageReader.getValueReader()
} else {
} else {
custDefLevelReader.set(null, 0);
private final void setBufferedPagePayload() {
if (parentInst.pageReader.hasPage() && oprReadState.numPageFieldsProcessed < parentInst.pageReader.pageValueCount) {
if (!parentInst.usingDictionary) {
pageInfo.pageData = parentInst.pageReader.pageData;
pageInfo.pageDataOff = (int) oprReadState.pageReadPos;
pageInfo.pageDataLen = (int) parentInst.pageReader.byteLength;
pageInfo.numPageValues = parentInst.pageReader.pageValueCount;
pageInfo.definitionLevels = custDefLevelReader;
pageInfo.encodedValueReader = custValuesReader;
pageInfo.numPageFieldsRead = oprReadState.numPageFieldsProcessed;
if (buffPagePayload == null) {
buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback, fieldOverflowStateContainer);
} else {
buffPagePayload.set(pageInfo, true);
} else {
if (buffPagePayload == null) {
buffPagePayload = new VarLenBulkPageReader(null, columnPrecInfo, callback, fieldOverflowStateContainer);
final ColumnPrecisionInfo getColumnPrecisionInfo() {
return columnPrecInfo;
/** Reads a data sample to evaluate this column's precision (variable or fixed); this is best effort, caller
* should be ready to handle false positives.
* @param columnPrecInfo input/output precision info container
* @throws IOException
private final void guessColumnPrecision(ColumnPrecisionInfo columnPrecInfo) throws IOException {
columnPrecInfo.columnPrecisionType = ColumnPrecisionType.DT_PRECISION_IS_VARIABLE;
// Minimum number of values within a data size to consider bulk processing
final int minNumVals = VarLenBulkPageReader.BUFF_SZ / BULK_PROCESSING_MAX_PREC_LEN;
final int maxDataToProcess =
Math.min(VarLenBulkPageReader.BUFF_SZ + 4 * minNumVals,
(int) (parentInst.pageReader.byteLength-parentInst.pageReader.readyToReadPosInBytes));
if (parentInst.recordsRequireDecoding() || maxDataToProcess == 0) {
// The number of values is small, there are lot of null values, or dictionary encoding is used. Bulk
// processing should work fine for these use-cases
columnPrecInfo.bulkProcess = true;
ByteBuffer buffer = ByteBuffer.allocate(maxDataToProcess);
parentInst.pageReader.pageData.getBytes((int) parentInst.pageReader.readyToReadPosInBytes, buffer.array(), 0, maxDataToProcess);
int numValues = 0;
int fixedDataLen = -1;
boolean isFixedPrecision = false;
do {
if (buffer.remaining() < 4) {
int data_len = buffer.getInt();
if (fixedDataLen < 0) {
fixedDataLen = data_len;
isFixedPrecision = true;
if (isFixedPrecision && fixedDataLen != data_len) {
isFixedPrecision = false;
if (buffer.remaining() < data_len) {
buffer.position(buffer.position() + data_len);
} while (true);
// We need to have encountered at least a couple of values with the same length; if the values
// have long length, then fixed vs VL is not a big deal with regard to performance.
if (isFixedPrecision && fixedDataLen >= 0) {
columnPrecInfo.columnPrecisionType = ColumnPrecisionType.DT_PRECISION_IS_FIXED;
columnPrecInfo.precision = fixedDataLen;
columnPrecInfo.bulkProcess = true;
} else {
columnPrecInfo.columnPrecisionType = ColumnPrecisionType.DT_PRECISION_IS_VARIABLE;
columnPrecInfo.bulkProcess = false;
} else {
// At this point we know this column is variable length; we need to figure out whether it is worth
// processing it in a bulk-manner or not.
if (numValues >= minNumVals) {
columnPrecInfo.bulkProcess = true;
} else {
columnPrecInfo.bulkProcess = false;
private void loadPageIfNeed() throws IOException {
if (!parentInst.pageReader.hasPage()) {
// load a page;
// update the definition level information
private boolean batchConstraintsReached() {
// Let's update this column's memory quota
columnMemoryQuota = batchSizerMgr.getCurrentFieldBatchMemory(parentInst.valueVec.getField().getName());
assert columnMemoryQuota.getMaxMemoryUsage() > 0;
// Now try to figure out whether the next chunk will take us beyond the memory quota
final int maxNumRecordsInChunk = VarLenBulkPageReader.BUFF_SZ / BatchSizingMemoryUtil.INT_VALUE_WIDTH;
if (this.parentInst.valueVec.getField().isNullable()) {
return batchConstraintsReached(
maxNumRecordsInChunk * BatchSizingMemoryUtil.BYTE_VALUE_WIDTH, // max "bits" space within a chunk
maxNumRecordsInChunk * BatchSizingMemoryUtil.INT_VALUE_WIDTH, // max "offsets" space within a chunk
VarLenBulkPageReader.BUFF_SZ // max "data" space within a chunk
} else {
return batchConstraintsReached(
maxNumRecordsInChunk * BatchSizingMemoryUtil.INT_VALUE_WIDTH, // max "offsets" space within a chunk
VarLenBulkPageReader.BUFF_SZ // max "data" space within a chunk
private boolean batchConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
assert oprReadState.batchFieldIndex <= recordsToRead; // cannot read beyond the batch size
// Did we reach the batch size limit?
if (oprReadState.batchFieldIndex == recordsToRead) {
return true; // batch size reached
// Memory constraint check logic:
// - if this is the first chunk to process, then let it proceed as we need to at least return
// one row; this also means the minimum batch memory shouldn't be lower than 2 chunks (please refer
// to getMinVLColumnMemorySize() method for more information).
// - Otherwise, we make sure that the memory growth after processing a chunk cannot go beyond the maximum
// batch memory for this column
// - There is also a caveat that needs to be handled during processing:
// o The page-bulk-reader will stop loading entries if it encounters a large value (doesn't fit within
// the chunk)
// o There is an exception though, which is if the entry is the first one within the batch (this is to
// ensure that we always make progress)
// o In this situation a callback to this object is made to assess whether this large entry can be loaded
// into the ValueVector.
// Is this the first chunk to be processed?
if (oprReadState.batchFieldIndex == 0) {
return false; // we should process at least one chunk
// Is the next processed chunk going to cause memory to overflow beyond the allowed limit?
columnMemoryUsage.vector = parentInst.valueVec;
columnMemoryUsage.memoryQuota = columnMemoryQuota;
columnMemoryUsage.currValueCount = oprReadState.batchFieldIndex;
// Return true if we cannot add this new payload
return !BatchSizingMemoryUtil.canAddNewData(columnMemoryUsage, newBitsMemory, newOffsetsMemory, newDataMemory);
private int getRemainingRecords() {
// remaining records to return within this batch
final int toReadRemaining = recordsToRead - oprReadState.batchFieldIndex;
final int remainingOverflowData = getRemainingOverflowData();
final int remaining;
// This method remainder semantic depends on whether we are dealing with page data or
// overflow data; now that overflow data is behaving like a source of input
if (remainingOverflowData == 0) {
final int pageRemaining = parentInst.pageReader.pageValueCount - oprReadState.numPageFieldsProcessed;
remaining = Math.min(toReadRemaining, pageRemaining);
} else {
remaining = Math.min(toReadRemaining, remainingOverflowData);
return remaining;
private boolean overflowDataAvailable() {
return getRemainingOverflowData() > 0;
private int getRemainingOverflowData() {
if (fieldOverflowStateContainer != null) {
FieldOverflowStateImpl overflowState =
(FieldOverflowStateImpl) fieldOverflowStateContainer.overflowState;
if (overflowState != null) {
return overflowState.getRemainingOverflowData();
} else {
// This can happen if this is the first time we are accessing this container as
// the overflow reader didn't have the chance consume any overflow data yet.
return fieldOverflowStateContainer.overflowDef.numValues;
return 0;
private void deinitOverflowData() {
fieldOverflowStateContainer = null;
// --------------------------------------------------------------------------
// Inner Classes
// --------------------------------------------------------------------------
/** Enumeration which indicates whether a column's type precision is unknown, variable, or fixed. */
enum ColumnPrecisionType {
static boolean isPrecTypeUnknown(ColumnPrecisionType type) {
return DT_PRECISION_UNKNOWN.equals(type);
static boolean isPrecTypeFixed(ColumnPrecisionType type) {
return DT_PRECISION_IS_FIXED.equals(type);
static boolean isPrecTypeVariable(ColumnPrecisionType type) {
return DT_PRECISION_IS_VARIABLE.equals(type);
/** this class enables us to cache state across bulk reader operations */
final static class BulkReaderState {
/** Column Precision Type: variable or fixed length; used to overcome unreliable meta-data information */
final ColumnPrecisionInfo columnPrecInfo = new ColumnPrecisionInfo();
* A custom definition level reader which overcomes Parquet's ValueReader limitations (that is,
* no ability to peek)
final DefLevelReaderWrapper definitionLevelReader = new DefLevelReaderWrapper();
* A custom values reader which overcomes Parquet's ValueReader limitations (that is,
* no ability to peek)
final ValuesReaderWrapper encodedValuesReader = new ValuesReaderWrapper();
/** Container class to hold a column precision information */
final static class ColumnPrecisionInfo {
/** column precision type */
ColumnPrecisionType columnPrecisionType = ColumnPrecisionType.DT_PRECISION_UNKNOWN;
/** column precision; set only for fixed length precision */
int precision;
/** indicator on whether this column should be bulk processed */
boolean bulkProcess;
/** Copies source content into this object */
void clone(final ColumnPrecisionInfo src) {
columnPrecisionType = src.columnPrecisionType;
precision = src.precision;
bulkProcess = src.bulkProcess;
/** Contains information about current bulk read operation */
private final static class OprBulkReadState {
/** reader position within current page */
long pageReadPos;
/** number of fields processed within the current page */
int numPageFieldsProcessed;
/** field index within current batch */
int batchFieldIndex;
/** number of values actually read from Parquet pages (not from overflow data) within this batch */
int batchNumValuesReadFromPages;
OprBulkReadState(long pageReadPos, int numPageFieldsRead) {
this.pageReadPos = pageReadPos;
this.numPageFieldsProcessed = numPageFieldsRead;
this.batchFieldIndex = 0;
this.batchNumValuesReadFromPages = 0;
/** Container class for holding page data information */
final static class PageDataInfo {
/** Number of values within the current page */
int numPageValues;
/** Page data buffer */
DrillBuf pageData;
/** Offset within the page data */
int pageDataOff;
/** Page data length */
int pageDataLen;
/** number of fields read within current page */
int numPageFieldsRead;
/** Definition Level */
DefLevelReaderWrapper definitionLevels;
/** Encoded value reader */
ValuesReaderWrapper encodedValueReader;
/** Callback to allow a bulk reader interact with its parent */
final static class VarLenColumnBulkInputCallback {
/** Parent instance */
final VarLenColumnBulkInput<? extends ValueVector> parentInst;
/** Page reader object */
PageReader pageReader;
VarLenColumnBulkInputCallback(VarLenColumnBulkInput<? extends ValueVector> parentInst) {
this.parentInst = parentInst;
this.pageReader = this.parentInst.parentInst.pageReader;
* Enables Parquet column readers to reset the definition level reader to a specific state.
* @param skipCount the number of rows to skip (optional)
* @throws IOException
void resetDefinitionLevelReader(int skipCount) throws IOException {
* @return current page definition level
PageReader.IntIterator getDefinitionLevelsReader() {
return pageReader.definitionLevels;
* @param newBitsMemory new "bits" memory size
* @param newOffsetsMemory new "offsets" memory size
* @param newDataMemory new "data" memory size
* @return true if the new payload ("bits", "offsets", "data") will trigger a constraint violation; false
* otherwise
boolean batchMemoryConstraintsReached(int newBitsMemory, int newOffsetsMemory, int newDataMemory) {
return parentInst.batchConstraintsReached(newBitsMemory, newOffsetsMemory, newDataMemory);
/** Informs the parent the overflow data cannot be used anymore */
void deinitOverflowData() {
/** A wrapper value reader with the ability to control when to read the next value */
final static class DefLevelReaderWrapper {
/** Definition Level */
private PageReader.IntIterator definitionLevels;
/** Peeked value */
private int currValue;
/** Remaining values */
private int remaining;
* @return true if the current page has definition levels to be read
public boolean hasDefinitionLevels() {
return definitionLevels != null;
* Consume the first integer if not done; we want to empower the caller so to avoid extra checks
* during access methods (e.g., some consumers will not invoke this method as they rather access
* the raw reader..)
public void readFirstIntegerIfNeeded() {
assert definitionLevels != null;
if (currValue == -1) {
* Set the {@link PageReader#definitionLevels} object; if a null value is passed, then it is understood
* the current page doesn't have definition levels to be processed
* @param definitionLevels {@link ValuesReader} object
* @param numValues total number of values that can be read from the stream
void set(PageReader.IntIterator definitionLevels, int numValues) {
this.definitionLevels = definitionLevels;
this.currValue = -1;
this.remaining = numValues;
* @return the current integer from the page; this method has no side-effects (the underlying
* {@link ValuesReader} is not affected)
public int readCurrInteger() {
assert currValue >= 0;
return currValue;
* @return internally reads the next integer from the underlying {@link ValuesReader}; false if the stream
* reached EOF
public boolean nextIntegerIfNotEOF() {
return setNextInteger();
* @return underlying reader object; this object is now unusable
* note that you have to invoke the {@link #set(PageReader.IntIterator, int)} method
* to update this object state in case a) you have used the {@link PageReader.IntIterator} object and b)
* want to resume using this {@link DefLevelReaderWrapper} object instance
public PageReader.IntIterator getUnderlyingReader() {
currValue = -1; // to make this object unusable
return definitionLevels;
private boolean setNextInteger() {
if (remaining > 0) {
try {
currValue = definitionLevels.nextInt();
} catch (Exception e) {
throw new RuntimeException(e);
return true;
currValue = -1;
return false;
/** A wrapper value reader with the ability to control when to read the next value */
final static class ValuesReaderWrapper {
/** Encoded values reader */
private ValuesReader valuesReader;
/** Pushed back value */
private Binary pushedBackValue;
* @return true if the current page uses an encoded values reader for the data
public boolean isDefined() {
return valuesReader != null;
* Set the ValuesReader object; if a null value is passed, then it is understood
* the current page doesn't use an encoding like dictionary or delta.
* @param _rawReader {@link ValuesReader} object
void set(ValuesReader _rawReader) {
this.valuesReader = _rawReader;
this.pushedBackValue = null;
* @return the current entry from the page
public Binary getEntry() {
Binary entry = null;
if (pushedBackValue == null) {
entry = getNextEntry();
} else {
entry = pushedBackValue;
pushedBackValue = null;
return entry;
public void pushBack(Binary entry) {
pushedBackValue = entry;
private Binary getNextEntry() {
try {
return valuesReader.readBytes();
} catch (Exception e) {
throw new RuntimeException(e);