blob: 06e2b7924cfb76352e8034784e265799032eac5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.BatchSizingMemoryUtil;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
/**
* This class is responsible for processing serialized overflow data (generated in a previous batch); this way
* overflow data becomes an input source and is thus a) efficiently re-loaded into the current
* batch ValueVector and b) subjected to the same batching constraints rules.
*/
public final class VarLenOverflowReader extends VarLenAbstractEntryReader {
private final FieldOverflowStateContainer fieldOverflowContainer;
private final boolean isNullable;
private final FieldOverflowStateImpl overflowState;
/**
* CTOR.
* @param buffer byte buffer for data buffering (within CPU cache)
* @param containerCallback container callback
* @param fieldOverflowContainer field overflow container
* @param entry reusable bulk entry object
*/
VarLenOverflowReader(ByteBuffer buffer,
VarLenColumnBulkEntry entry,
VarLenColumnBulkInputCallback containerCallback,
FieldOverflowStateContainer fieldOverflowContainer) {
super(buffer, entry, containerCallback);
this.fieldOverflowContainer = fieldOverflowContainer;
this.isNullable = fieldOverflowContainer.overflowDef.field.isNullable();
// Initialize the overflow state object
initOverflowStateIfNeeded();
// By now the overflow state object should be initialized
overflowState = (FieldOverflowStateImpl) fieldOverflowContainer.overflowState;
}
/** {@inheritDoc} */
@Override
VarLenColumnBulkEntry getEntry(int valuesToRead) {
if (getRemainingOverflowData() == 0) {
return null; // overflow data fully committed
}
final int[] valueLengths = entry.getValuesLength();
final FieldOverflowDefinition overflowDef = fieldOverflowContainer.overflowDef;
final OverflowDataCache overflowDataCache = overflowState.overflowDataCache;
final int maxDataSize = VarLenBulkPageReader.BUFF_SZ;
// Flush the cache across batches
if (overflowState.currValueIdx == overflowState.numCommittedValues) {
overflowDataCache.flush();
}
// load some overflow data for processing
final int maxValues = Math.min(entry.getMaxEntries(), valuesToRead);
final int numAvailableValues = overflowDataCache.load(overflowState.currValueIdx, maxValues);
Preconditions.checkState(numAvailableValues > 0, "Number values to read [%s] should be greater than zero", numAvailableValues);
final int firstValueDataOffset = getDataBufferStartOffset() + adjustDataOffset(overflowState.currValueIdx);
int totalDataLen = 0;
int currValueIdx = overflowState.currValueIdx;
int idx = 0;
int numNulls = 0;
for ( ; idx < numAvailableValues; idx++, currValueIdx++) {
// Is this value defined?
if (!isNullable || overflowDataCache.getNullable(currValueIdx) == 1) {
final int dataLen = overflowDataCache.getDataLength(currValueIdx);
if ((totalDataLen + dataLen) > maxDataSize) {
break;
}
totalDataLen += dataLen;
valueLengths[idx] = dataLen;
} else {
valueLengths[idx] = -1;
++numNulls;
}
}
// We encountered a large value or no overflow data; need special handling
if (idx == 0) {
final int dataLen = overflowDataCache.getDataLength(currValueIdx);
return handleLargeEntry(maxDataSize, firstValueDataOffset, dataLen);
}
// Update the next overflow value index to be processed
overflowState.currValueIdx = currValueIdx;
// Now set the bulk entry
entry.set(firstValueDataOffset, totalDataLen, idx, idx - numNulls, overflowDef.buffer);
return entry;
}
/**
* @return remaining overflow data (total-overflow-data - (committed + returned-within-current-batch))
*/
int getRemainingOverflowData() {
return overflowState.getRemainingOverflowData();
}
private VarLenColumnBulkEntry handleLargeEntry(int maxDataSize, int firstValueDataOffset, int totalDataLen) {
final FieldOverflowDefinition overflowDef = fieldOverflowContainer.overflowDef;
final FieldOverflowStateImpl overflowState = (FieldOverflowStateImpl) fieldOverflowContainer.overflowState;
final int[] valueLengths = entry.getValuesLength();
// Is there enough memory to handle this large value?
if (batchMemoryConstraintsReached(isNullable ? 1 : 0, 4, totalDataLen)) {
entry.set(0, 0, 0, 0); // no data to be consumed
return entry;
}
// Register the length
valueLengths[0] = totalDataLen;
// We already have all the information we need
entry.set(firstValueDataOffset, totalDataLen, 1, 1, overflowDef.buffer);
// Update the current value index
overflowState.currValueIdx++;
return entry;
}
void initOverflowStateIfNeeded() {
// An overflow happened in the previous batch; this is the first time we are trying to
// consume this overflow data (in some cases, several batches are needed if somehow the
// number-of-records-per-batch becomes small).
if (fieldOverflowContainer.overflowState == null) {
fieldOverflowContainer.overflowState = new FieldOverflowStateImpl(buffer, fieldOverflowContainer.overflowDef);
}
}
private int adjustDataOffset(int valueIdx) {
// The overflow definition stores offsets without adjustment (the offsets still refer to the original
// buffer). We need to perform a minor transformation to compute the correct offset within the new buffer:
// adjusted-offset(value-idx) = offset(value-i) - offset(value-0)
int firstOffset, targetOffset;
if (!isNullable) {
firstOffset = fieldOverflowContainer.overflowDef.buffer.getInt(0);
targetOffset = fieldOverflowContainer.overflowDef.buffer.getInt(valueIdx * 4);
} else {
final int numOverflowValues = fieldOverflowContainer.overflowDef.numValues;
firstOffset = fieldOverflowContainer.overflowDef.buffer.getInt(numOverflowValues);
targetOffset = fieldOverflowContainer.overflowDef.buffer.getInt(numOverflowValues + valueIdx * 4);
}
return targetOffset - firstOffset;
}
private int getDataBufferStartOffset() {
if (!isNullable) {
// <num-values+1 offsets><data>
return (fieldOverflowContainer.overflowDef.numValues + 1) * 4;
} else {
// <num-values nullable bytes><num-values+1 offsets><data>
return fieldOverflowContainer.overflowDef.numValues + (fieldOverflowContainer.overflowDef.numValues + 1) * 4;
}
}
// ----------------------------------------------------------------------------
// Inner Data Structure
// ----------------------------------------------------------------------------
/** Allows overflow reader to maintain overflow data state */
final static class FieldOverflowStateImpl implements FieldOverflowState {
/**
* The number of overflow values consumed by previous batches; this means that if a new overflow
* happens, then we should return uncommitted values (un-consumed)
*/
private int numCommittedValues;
/** Next value index to be processed */
private int currValueIdx;
/** A heap cache to accelerate loading of overflow data into bulk entries */
private final OverflowDataCache overflowDataCache;
private FieldOverflowStateImpl(ByteBuffer buffer, FieldOverflowDefinition overflowDef) {
overflowDataCache = new OverflowDataCache(buffer, overflowDef);
}
/** {@inheritDoc} */
@Override
public void onNewBatchValuesConsumed(int numValues) {
if (numCommittedValues < overflowDataCache.overflowDef.numValues) {
numCommittedValues += numValues;
currValueIdx = numCommittedValues;
}
}
/** {@inheritDoc} */
@Override
public boolean isOverflowDataFullyConsumed() {
return numCommittedValues == overflowDataCache.overflowDef.numValues;
}
/**
* @return remaining overflow data (total-overflow-data - (committed + returned-within-current-batch))
*/
int getRemainingOverflowData() {
assert currValueIdx <= overflowDataCache.overflowDef.numValues;
return overflowDataCache.overflowDef.numValues - currValueIdx;
}
}
/** Enable us to reuse cached overflow data across calls */
final static class OverflowDataCache {
/** Cache format: [<nullvalue>*][<offset>*]; the last "-1" is to account for the extra offset to retrieve
* since each value requires two offsets */
private static final int MAX_NUM_VALUES = (VarLenBulkPageReader.BUFF_SZ / (BatchSizingMemoryUtil.INT_VALUE_WIDTH + 1)) - 1;
/** Byte buffer for CPU caching */
private final byte[] bufferArray;
/** Overflow definition */
private final FieldOverflowDefinition overflowDef;
/** Whether this column is optional */
final boolean isNullable;
/** start index of the first cached overflow data */
private int firstCachedValueIdx;
/** number of cached values */
private int numCachedValues;
private OverflowDataCache(ByteBuffer buffer, FieldOverflowDefinition overflowDef) {
this.bufferArray = buffer.array();
this.overflowDef = overflowDef;
this.isNullable = this.overflowDef.field.isNullable();
this.firstCachedValueIdx = -1;
this.numCachedValues = -1;
}
private void flush() {
this.firstCachedValueIdx = -1;
this.numCachedValues = -1;
}
private int load(int currentValueIdx, int maxValues) {
if (currentValueIdx >= overflowDef.numValues) {
throw new RuntimeException();
}
assert currentValueIdx < overflowDef.numValues;
if (numCachedValues > 0
&& currentValueIdx >= lowerBound()
&& currentValueIdx <= upperBound()) {
return upperBound() - currentValueIdx + 1;
}
// Either cache is empty or the requested values are not in the cache
loadInternal(currentValueIdx, maxValues);
return numCachedValues;
}
/**
* @return the lowest overflow value index in the cache
*/
private int lowerBound() {
return firstCachedValueIdx;
}
/**
* @return the highest overflow value index in the cache
*/
private int upperBound() {
return firstCachedValueIdx + numCachedValues - 1;
}
private byte getNullable(int valueIdx) {
assert isNullable;
assert valueIdx >= lowerBound();
assert valueIdx <= upperBound();
// We need to map the overflow value index to the buffer array representation
final int cacheIdx = (valueIdx - lowerBound()) * BatchSizingMemoryUtil.BYTE_VALUE_WIDTH;
return bufferArray[cacheIdx];
}
private int getDataLength(int valueIdx) {
assert valueIdx >= lowerBound();
assert valueIdx <= upperBound();
// We need to map the overflow value index to the buffer array representation
int cacheIdx1, cacheIdx2;
if (!isNullable) {
cacheIdx1 = (valueIdx - lowerBound()) * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
} else {
cacheIdx1 = numCachedValues +
(valueIdx - lowerBound()) * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
}
cacheIdx2 = cacheIdx1 + BatchSizingMemoryUtil.INT_VALUE_WIDTH;
return VarLenOverflowReader.getInt(bufferArray, cacheIdx2) - VarLenOverflowReader.getInt(bufferArray, cacheIdx1);
}
private void loadInternal(int targetIdx, int maxValues) {
// We need to load a new batch of overflow data (bits and offsets)
firstCachedValueIdx = targetIdx;
final int remaining = remaining();
assert remaining > 0;
final int maxValuesToLoad = Math.min(MAX_NUM_VALUES, maxValues);
numCachedValues = Math.min(remaining, maxValuesToLoad);
// Let us load the nullable & offsets data (the actual data doesn't have to be loaded)
loadNullable();
loadOffsets();
}
void loadNullable() {
if (!isNullable) {
return; // NOOP
}
overflowDef.buffer.getBytes(firstCachedValueIdx, bufferArray, 0, numCachedValues);
}
void loadOffsets() {
int sourceIdx, targetIdx;
if (!isNullable) {
sourceIdx = firstCachedValueIdx * BatchSizingMemoryUtil.INT_VALUE_WIDTH;
targetIdx = 0;
} else {
sourceIdx = overflowDef.numValues + (firstCachedValueIdx * BatchSizingMemoryUtil.INT_VALUE_WIDTH);
targetIdx = numCachedValues;
}
// We always get one extra value as to compute the length of value-i we need offset-i and offset-i+1
overflowDef.buffer.getBytes(sourceIdx, bufferArray, targetIdx, (numCachedValues + 1) * BatchSizingMemoryUtil.INT_VALUE_WIDTH);
}
private int remaining() {
return overflowDef.numValues - firstCachedValueIdx;
}
}
}