blob: fbd38ae0f941e5e0da99dd5ecc055e76e9aeb0ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.record;
import com.google.common.base.Preconditions;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import java.util.List;
public class RecordBatchMemoryManager {
// The " - 1 " in the number of rows below was chosen to avoid a waste of (possible) offset vectors memory
// where a power-of-2 rows size needlessly doubles the offset vector (see DRILL-5446)
protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT - 1;
protected static final int MIN_NUM_ROWS = 1;
protected static final int DEFAULT_INPUT_INDEX = 0;
private int outputRowCount = MAX_NUM_ROWS;
// max row count allowed in the current output batch; it would never exceed the allocated size (i.e. outputRowCount)
// but may go lower (e.g., when early rows are narrow, the outgoing batch can hold many output rows, but if later
// the incoming rows become wide, then less (than planned) would fit into the remaining current allocated memory)
private int currentOutgoingMaxRowCount = MAX_NUM_ROWS;
private int outgoingRowWidth;
private final int outputBatchSize;
private final RecordBatchSizer[] sizer;
private final BatchStats[] inputBatchStats;
private final BatchStats outputBatchStats;
// By default, we expect one input batch stream and one output batch stream.
// Some operators can get multiple input batch streams i.e. for example
// joins get 2 batches (left and right). Merge Receiver can get more than 2.
private int numInputs = 1;
private class BatchStats {
/**
* operator metric stats
*/
private long numBatches;
private long sumBatchSizes;
private long totalRecords;
public long getNumBatches() {
return numBatches;
}
public long getTotalRecords() {
return totalRecords;
}
public long getAvgBatchSize() {
return RecordBatchSizer.safeDivide(sumBatchSizes, numBatches);
}
public long getAvgRowWidth() {
return RecordBatchSizer.safeDivide(sumBatchSizes, totalRecords);
}
public void incNumBatches() {
++numBatches;
}
public void incSumBatchSizes(long batchSize) {
sumBatchSizes += batchSize;
}
public void incTotalRecords(long numRecords) {
totalRecords += numRecords;
}
}
public long getNumOutgoingBatches() {
return outputBatchStats.getNumBatches();
}
public long getTotalOutputRecords() {
return outputBatchStats.getTotalRecords();
}
public long getAvgOutputBatchSize() {
return outputBatchStats.getAvgBatchSize();
}
public long getAvgOutputRowWidth() {
return outputBatchStats.getAvgRowWidth();
}
public long getNumIncomingBatches() {
return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getNumBatches();
}
public long getAvgInputBatchSize() {
return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getAvgBatchSize();
}
public long getAvgInputRowWidth() {
return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getAvgRowWidth();
}
public long getTotalInputRecords() {
return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getTotalRecords();
}
public long getNumIncomingBatches(int index) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getNumBatches();
}
public long getAvgInputBatchSize(int index) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getAvgBatchSize();
}
public long getAvgInputRowWidth(int index) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getAvgRowWidth();
}
public long getTotalInputRecords(int index) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getTotalRecords();
}
public RecordBatchMemoryManager(int numInputs, int configuredOutputSize) {
this.numInputs = numInputs;
this.outputBatchSize = configuredOutputSize;
sizer = new RecordBatchSizer[numInputs];
inputBatchStats = new BatchStats[numInputs];
outputBatchStats = new BatchStats();
}
public RecordBatchMemoryManager(int configuredOutputSize) {
this.outputBatchSize = configuredOutputSize;
sizer = new RecordBatchSizer[numInputs];
inputBatchStats = new BatchStats[numInputs];
outputBatchStats = new BatchStats();
}
public void update(int inputIndex) {
}
public void update() {};
public void update(RecordBatch recordBatch) {
}
public void update(RecordBatch recordBatch, int index) {
// Get sizing information for the batch.
setRecordBatchSizer(index, new RecordBatchSizer(recordBatch));
setOutgoingRowWidth(getRecordBatchSizer(index).getNetRowWidth());
// Number of rows in outgoing batch
setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).getNetRowWidth());
updateIncomingStats(index);
}
public void update(int inputIndex, int outputPosition, boolean useAggregate) {
throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
public void update(int inputIndex, int outputPosition) {
throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
public void update(RecordBatch batch, int inputIndex, int outputPosition) {
throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
public boolean updateIfNeeded(int newOutgoingRowWidth) {
// We do not want to keep adjusting batch holders target row count
// for small variations in row width.
// If row width changes, calculate actual adjusted row count i.e. row count
// rounded down to nearest power of two and do nothing if that does not change.
if (newOutgoingRowWidth == outgoingRowWidth ||
computeOutputRowCount(outputBatchSize, newOutgoingRowWidth) == computeOutputRowCount(outputBatchSize, outgoingRowWidth)) {
return false;
}
// Set number of rows in outgoing batch. This number will be used for new batch creation.
setOutputRowCount(outputBatchSize, newOutgoingRowWidth);
setOutgoingRowWidth(newOutgoingRowWidth);
return true;
}
/**
* Should be used as maximum output row count that can be filled in output batch when a new output batch is
* allocated after calling update on BatchMemoryManager.
* @return outputRowCount max output row count
*/
public int getOutputRowCount() {
return outputRowCount;
}
/**
* Should be used as maximum output row count that can be filled in output batch which is already allocated.
* @return currentOutgoingMaxRowCount max output row count for current output batch
*/
public int getCurrentOutgoingMaxRowCount() { return currentOutgoingMaxRowCount; }
/**
* Given batchSize and rowWidth, this will set output rowCount taking into account
* the min and max that is allowed.
*/
public void setOutputRowCount(int targetBatchSize, int rowWidth) {
this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth));
}
public void setOutputRowCount(int outputRowCount) {
Preconditions.checkArgument(outputRowCount <= MAX_NUM_ROWS);
this.outputRowCount = outputRowCount;
}
/**
* Set the max row count which the current output batch (already allocated) can contain. Since this setter doesn't
* adjust the input value we make sure it doesn't go above MAX_NUM_ROWS
* @param newTargetOutputCount
*/
public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) {
this.currentOutgoingMaxRowCount = Math.min(MAX_NUM_ROWS, newTargetOutputCount);
}
/**
* This will adjust rowCount taking into account the min and max that is allowed.
* We will round down to nearest power of two - 1 for better memory utilization.
* -1 is done for adjusting accounting for offset vectors.
*/
public static int adjustOutputRowCount(int rowCount) {
return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS)));
}
public static int computeOutputRowCount(int batchSize, int rowWidth) {
return adjustOutputRowCount(RecordBatchSizer.safeDivide(batchSize, rowWidth));
}
public void setOutgoingRowWidth(int outgoingRowWidth) {
this.outgoingRowWidth = outgoingRowWidth;
}
public int getOutgoingRowWidth() {
return outgoingRowWidth;
}
public void setRecordBatchSizer(int index, RecordBatchSizer sizer) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
this.sizer[index] = sizer;
if (inputBatchStats[index] == null) {
inputBatchStats[index] = new BatchStats();
}
}
public void setRecordBatchSizer(RecordBatchSizer sizer) {
setRecordBatchSizer(DEFAULT_INPUT_INDEX, sizer);
}
public RecordBatchSizer getRecordBatchSizer(int index) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
return sizer[index];
}
public RecordBatchSizer getRecordBatchSizer() {
return sizer[DEFAULT_INPUT_INDEX];
}
public RecordBatchSizer.ColumnSize getColumnSize(int index, String name) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
return sizer[index].getColumn(name);
}
public RecordBatchSizer.ColumnSize getColumnSize(String name) {
for (int index = 0; index < numInputs; index++) {
if (sizer[index] == null || sizer[index].getColumn(name) == null) {
continue;
}
return sizer[index].getColumn(name);
}
return null;
}
public void updateIncomingStats(int index) {
Preconditions.checkArgument(index >= 0 && index < numInputs);
Preconditions.checkArgument(inputBatchStats[index] != null);
inputBatchStats[index].incNumBatches();
inputBatchStats[index].incSumBatchSizes(sizer[index].getNetBatchSize());
inputBatchStats[index].incTotalRecords(sizer[index].rowCount());
}
public void updateIncomingStats() {
inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches();
inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].getNetBatchSize());
inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount());
}
public void updateOutgoingStats(int outputRecords) {
outputBatchStats.incNumBatches();
outputBatchStats.incTotalRecords(outputRecords);
outputBatchStats.incSumBatchSizes(outgoingRowWidth * outputRecords);
}
public int getOutputBatchSize() {
return outputBatchSize;
}
public int getOffsetVectorWidth() {
return UInt4Vector.VALUE_WIDTH;
}
public void allocateVectors(VectorContainer container, int recordCount) {
// Allocate memory for the vectors.
// This will iteratively allocate memory for all nested columns underneath.
for (VectorWrapper<?> w : container) {
RecordBatchSizer.ColumnSize colSize = getColumnSize(w.getField().getName());
colSize.allocateVector(w.getValueVector(), recordCount);
}
container.setRecordCount(0);
}
public void allocateVectors(List<ValueVector> valueVectors, int recordCount) {
// Allocate memory for the vectors.
// This will iteratively allocate memory for all nested columns underneath.
for (ValueVector v : valueVectors) {
RecordBatchSizer.ColumnSize colSize = getColumnSize(v.getField().getName());
colSize.allocateVector(v, recordCount);
}
}
public void allocateVectors(VectorContainer container) {
allocateVectors(container, outputRowCount);
}
public void allocateVectors(List<ValueVector> valueVectors) {
allocateVectors(valueVectors, outputRowCount);
}
}