blob: 6c2770635309e713be128f867e529f88b23b2033 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.drill.exec.physical.resultSet.impl;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.scan.project.projSet.ProjectionSetFactory;
import org.apache.drill.exec.physical.resultSet.ProjectionSet;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.physical.resultSet.impl.TupleState.RowState;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Implementation of the result set loader. Caches vectors
* for a row or map.
* @see {@link ResultSetLoader}
public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals {
* Read-only set of options for the result set loader.
public static class ResultSetOptions {
protected final int vectorSizeLimit;
protected final int rowCountLimit;
protected final ResultVectorCache vectorCache;
protected final ProjectionSet projectionSet;
protected final TupleMetadata schema;
protected final long maxBatchSize;
* Context for error messages.
protected final CustomErrorContext errorContext;
public ResultSetOptions() {
vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE;
rowCountLimit = DEFAULT_ROW_COUNT;
projectionSet = ProjectionSetFactory.projectAll();
vectorCache = null;
schema = null;
maxBatchSize = -1;
errorContext = null;
public ResultSetOptions(OptionBuilder builder) {
vectorSizeLimit = builder.vectorSizeLimit;
rowCountLimit = builder.rowCountLimit;
vectorCache = builder.vectorCache;
schema = builder.schema;
maxBatchSize = builder.maxBatchSize;
errorContext = builder.errorContext;
projectionSet = builder.projectionSet == null ?
ProjectionSetFactory.projectAll() :
public void dump(HierarchicalFormatter format) {
.attribute("vectorSizeLimit", vectorSizeLimit)
.attribute("rowCountLimit", rowCountLimit)
private enum State {
* Before the first batch.
* Writing to a batch normally.
* Batch overflowed a vector while writing. Can continue
* to write to a temporary "overflow" batch until the
* end of the current row.
* Temporary state to avoid batch-size related overflow while
* an overflow is in progress.
* Batch is full due to reaching the row count limit
* when saving a row.
* No more writes allowed until harvesting the current batch.
* Current batch was harvested: data is gone. No lookahead
* batch exists.
* Current batch was harvested and its data is gone. However,
* overflow occurred during that batch and the data exists
* in the overflow vectors.
* <p>
* This state needs special consideration. The column writer
* structure maintains its state (offsets, etc.) from the OVERFLOW
* state, but the buffers currently in the vectors are from the
* complete batch. <b>No writes can be done in this state!</b>
* The writer state does not match the data in the buffers.
* The code here does what it can to catch this state. But, if
* some client tries to write to a column writer in this state,
* bad things will happen. Doing so is invalid (the write is outside
* of a batch), so this is not a terrible restriction.
* <p>
* Said another way, the current writer state is invalid with respect
* to the active buffers, but only if the writers try to act on the
* buffers. Since the writers won't do so, this temporary state is
* fine. The correct buffers are restored once a new batch is started
* and the state moves to ACTIVE.
* Mutator is closed: no more operations are allowed.
protected static final Logger logger = LoggerFactory.getLogger(ResultSetLoaderImpl.class);
* Options provided to this loader.
private final ResultSetOptions options;
* Allocator for vectors created by this loader.
private final BufferAllocator allocator;
* Builds columns (vector, writer, state).
private final ColumnBuilder columnBuilder;
* Internal structure used to work with the vectors (real or dummy) used
* by this loader.
private final RowState rootState;
* Top-level writer index that steps through the rows as they are written.
* When an overflow batch is in effect, indexes into that batch instead.
* Since a batch is really a tree of tuples, in which some branches of
* the tree are arrays, the root indexes here feeds into array indexes
* within the writer structure that points to the current position within
* an array column.
private final WriterIndexImpl writerIndex;
* The row-level writer for stepping through rows as they are written,
* and for accessing top-level columns.
private final RowSetLoaderImpl rootWriter;
* Tracks the state of the row set loader. Handling vector overflow requires
* careful stepping through a variety of states as the write proceeds.
private State state = State.START;
* Track the current schema as seen by the writer. Each addition of a column
* anywhere in the schema causes the active schema version to increase by one.
* This allows very easy checks for schema changes: save the prior version number
* and compare it against the current version number.
private int activeSchemaVersion;
* Track the current schema as seen by the consumer of the batches that this
* loader produces. The harvest schema version can be behind the active schema
* version in the case in which new columns are added to the overflow row.
* Since the overflow row won't be visible to the harvested batch, that batch
* sees the schema as it existed at a prior version: the harvest schema
* version.
private int harvestSchemaVersion;
* Counts the batches harvested (sent downstream) from this loader. Does
* not include the current, in-flight batch.
private int harvestBatchCount;
* Counts the rows included in previously-harvested batches. Does not
* include the number of rows in the current batch.
private int previousRowCount;
* Number of rows in the harvest batch. If an overflow batch is in effect,
* then this is the number of rows in the "main" batch before the overflow;
* that is the number of rows in the batch that will be harvested. If no
* overflow row is in effect, then this number is undefined (and should be
* zero.)
private int pendingRowCount;
* The number of rows per batch. Starts with the configured amount. Can be
* adjusted between batches, perhaps based on the actual observed size of
* input data.
private int targetRowCount;
* Total bytes allocated to the current batch.
protected int accumulatedBatchSize;
protected final ProjectionSet projectionSet;
public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options) {
this.allocator = allocator;
this.options = options;
targetRowCount = options.rowCountLimit;
writerIndex = new WriterIndexImpl(this);
columnBuilder = new ColumnBuilder();
// Set the projections
projectionSet = options.projectionSet;
// Determine the root vector cache
ResultVectorCache vectorCache;
if (options.vectorCache == null) {
vectorCache = new NullResultVectorCacheImpl(allocator);
} else {
vectorCache = options.vectorCache;
// Build the row set model depending on whether a schema is provided.
rootState = new RowState(this, vectorCache);
rootWriter = rootState.rootWriter();
// If no schema, columns will be added incrementally as they
// are discovered. Start with an empty model.
if (options.schema != null) {
// Schema provided. Populate a model (and create vectors) for the
// provided schema. The schema can be extended later, but normally
// won't be if known up front.
logger.debug("Schema: " + options.schema.toString());
BuildFromSchema.instance().buildTuple(rootWriter, options.schema);
// If we want to project nothing, then we do, in fact, have a
// valid schema, it just happens to be an empty schema. Bump the
// schema version so we know we have that empty schema.
// This accomplishes a result similar to the "legacy" readers
// achieve by adding a dummy column.
if (projectionSet.isEmpty()) {
private void updateCardinality() {
public ResultSetLoaderImpl(BufferAllocator allocator) {
this(allocator, new ResultSetOptions());
public BufferAllocator allocator() { return allocator; }
public int bumpVersion() {
// Update the active schema version. We cannot update the published
// schema version at this point because a column later in this same
// row might cause overflow, and any new columns in this row will
// be hidden until a later batch. But, if we are between batches,
// then it is fine to add the column to the schema.
switch (state) {
case START:
harvestSchemaVersion = activeSchemaVersion;
return activeSchemaVersion;
public int activeSchemaVersion( ) { return activeSchemaVersion; }
public int schemaVersion() {
switch (state) {
case ACTIVE:
// Write in progress: use current writer schema
return activeSchemaVersion;
case START:
// Batch is published. Use harvest schema.
return harvestSchemaVersion;
// Not really in a position to give a schema
// version.
throw new IllegalStateException("Unexpected state: " + state);
public void startBatch() {
* Start a batch to report only schema without data.
public void startEmptyBatch() {
public void startBatch(boolean schemaOnly) {
switch (state) {
case START:
logger.trace("Start batch");
accumulatedBatchSize = 0;
// The previous batch ended without overflow, so start
// a new batch, and reset the write index to 0.
// A row overflowed so keep the writer index at its current value
// as it points to the second row in the overflow batch. However,
// the last write position of each writer must be restored on
// a column-by-column basis, which is done by the visitor.
logger.trace("Start batch after overflow");
// Note: no need to do anything with the writers; they were left
// pointing to the correct positions in the look-ahead batch.
// The above simply puts the look-ahead vectors back "under"
// the writers.
throw new IllegalStateException("Unexpected state: " + state);
// Update the visible schema with any pending overflow batch
// updates.
harvestSchemaVersion = activeSchemaVersion;
pendingRowCount = 0;
state = State.ACTIVE;
public boolean hasRows() {
switch (state) {
case ACTIVE:
return rootWriter.rowCount() > 0;
return true;
return false;
public RowSetLoader writer() {
if (state == State.CLOSED) {
throw new IllegalStateException("Unexpected state: " + state);
return rootWriter;
public ResultSetLoader setRow(Object... values) {
return this;
* Called before writing a new row. Implementation of
* {@link RowSetLoader#start()}.
protected void startRow() {
switch (state) {
case ACTIVE:
// Update the visible schema with any pending overflow batch
// updates.
harvestSchemaVersion = activeSchemaVersion;
throw new IllegalStateException("Unexpected state: " + state);
* Finalize the current row. Implementation of
* {@link RowSetLoader#save()}.
protected void saveRow() {
switch (state) {
case ACTIVE:
if (! {
state = State.FULL_BATCH;
// No overflow row. Advertise the schema version to the client.
harvestSchemaVersion = activeSchemaVersion;
// End the value of the look-ahead row in the look-ahead vectors.
// Advance the writer index relative to the look-ahead batch.;
// Stay in the overflow state. Doing so will cause the writer
// to report that it is full.
// Also, do not change the harvest schema version. We will
// expose to the downstream operators the schema in effect
// at the start of the row. Columns added within the row won't
// appear until the next batch.
throw new IllegalStateException("Unexpected state: " + state);
* Implementation of {@link RowSetLoader#isFull()}
* @return true if the batch is full (reached vector capacity or the
* row count limit), false if more rows can be added
protected boolean isFull() {
switch (state) {
case ACTIVE:
return ! writerIndex.valid();
return true;
return false;
public boolean writeable() {
return state == State.ACTIVE || state == State.OVERFLOW;
private boolean isBatchActive() {
return state == State.ACTIVE || state == State.OVERFLOW ||
state == State.FULL_BATCH;
* Implementation for {#link {@link RowSetLoader#rowCount()}.
* @return the number of rows to be sent downstream for this
* batch. Does not include the overflow row.
protected int rowCount() {
switch (state) {
case ACTIVE:
return writerIndex.size();
return pendingRowCount;
return 0;
protected WriterIndexImpl writerIndex() { return writerIndex; }
public void setTargetRowCount(int rowCount) {
targetRowCount = Math.max(1, rowCount);
public int targetRowCount() { return targetRowCount; }
public int targetVectorSize() { return options.vectorSizeLimit; }
public int skipRows(int requestedCount) {
// Can only skip rows when a batch is active.
if (state != State.ACTIVE) {
throw new IllegalStateException("No batch is active.");
// Skip as many rows as the vector limit allows.
return writerIndex.skipRows(requestedCount);
public boolean isProjectionEmpty() {
return ! rootState.hasProjections();
public void overflowed() {
logger.trace("Vector overflow");
// If we see overflow when we are already handling overflow, it means
// that a single value is too large to fit into an entire vector.
// Fail the query.
// Note that this is a judgment call. It is possible to allow the
// vector to double beyond the limit, but that will require a bit
// of thought to get right -- and, of course, completely defeats
// the purpose of limiting vector size to avoid memory fragmentation...
// Individual columns handle the case in which overflow occurs on the
// first row of the main batch. This check handles the pathological case
// in which we successfully overflowed, but then another column
// overflowed during the overflow row -- that indicates that that one
// column can't fit in an empty vector. That is, this check is for a
// second-order overflow.
if (state == State.OVERFLOW) {
throw UserException
.memoryError("A single column value is larger than the maximum allowed size of 16 MB")
if (state != State.ACTIVE) {
throw new IllegalStateException("Unexpected state: " + state);
state = State.IN_OVERFLOW;
// Preserve the number of rows in the now-complete batch.
pendingRowCount = writerIndex.vectorIndex();
// Roll-over will allocate new vectors. Update with the latest
// array cardinality.
// Wrap up the completed rows into a batch. Sets
// vector value counts. The rollover data still exists so
// it can be moved, but it is now past the recorded
// end of the vectors (though, obviously, not past the
// physical end.)
// Roll over vector values.
accumulatedBatchSize = 0;
// Adjust writer state to match the new vector values. This is
// surprisingly easy if we note that the current row is shifted to
// the 0 position in the new vector, so we just shift all offsets
// downward by the current row position at each repeat level.
// The writer index is reset back to 0. Because of the above roll-over
// processing, some vectors may now already have values in the 0 slot.
// However, the vector that triggered overflow has not yet written to
// the current record, and so will now write to position 0. After the
// completion of the row, all 0-position values should be written (or
// at least those provided by the client.)
// For arrays, the writer might have written a set of values
// (v1, v2, v3), and v4 might have triggered the overflow. In this case,
// the array values have been moved, offset vectors adjusted, the
// element writer adjusted, so that v4 will be written to index 3
// to produce (v1, v2, v3, v4, v5, ...) in the look-ahead vector.
// Remember that overflow is in effect.
state = State.OVERFLOW;
public boolean hasOverflow() { return state == State.OVERFLOW; }
public VectorContainer outputContainer() {
return rootState.outputContainer();
public VectorContainer harvest() {
int rowCount;
switch (state) {
case ACTIVE:
rowCount = harvestNormalBatch();
logger.trace("Harvesting {} rows", rowCount);
rowCount = harvestOverflowBatch();
logger.trace("Harvesting {} rows after overflow", rowCount);
throw new IllegalStateException("Unexpected state: " + state);
VectorContainer container = rootState.outputContainer();
// Finalize: update counts, set state.
previousRowCount += rowCount;
return container;
private int harvestNormalBatch() {
// Wrap up the vectors: final fill-in, set value count, etc.
harvestSchemaVersion = activeSchemaVersion;
state = State.HARVESTED;
return writerIndex.size();
private int harvestOverflowBatch() {
state = State.LOOK_AHEAD;
return pendingRowCount;
public TupleMetadata harvestSchema() {
return rootState.outputSchema();
public void close() {
if (state == State.CLOSED) {
// Do not close the vector cache; the caller owns that and
// will, presumably, reuse those vectors for another writer.
state = State.CLOSED;
public int batchCount() {
return harvestBatchCount + (rowCount() == 0 ? 0 : 1);
public int totalRowCount() {
int total = previousRowCount;
if (isBatchActive()) {
total += pendingRowCount + writerIndex.size();
return total;
public RowState rootState() { return rootState; }
public boolean canExpand(int delta) {
accumulatedBatchSize += delta;
return state == State.IN_OVERFLOW ||
options.maxBatchSize <= 0 ||
accumulatedBatchSize <= options.maxBatchSize;
public void tallyAllocations(int allocationBytes) {
accumulatedBatchSize += allocationBytes;
* Log and check the initial vector allocation. If a batch size
* limit is set, warn if the initial allocation exceeds the limit.
* This will occur if the target row count is incorrect for the
* data size.
private void checkInitialAllocation() {
if (options.maxBatchSize < 0) {
logger.debug("Initial vector allocation: {}, no batch limit specified",
else if (accumulatedBatchSize > options.maxBatchSize) {
logger.warn("Initial vector allocation: {}, but batch size limit is: {}",
accumulatedBatchSize, options.maxBatchSize);
} else {
logger.debug("Initial vector allocation: {}, batch size limit: {}",
accumulatedBatchSize, options.maxBatchSize);
public void dump(HierarchicalFormatter format) {
.attribute("index", writerIndex.vectorIndex())
.attribute("state", state)
.attribute("activeSchemaVersion", activeSchemaVersion)
.attribute("harvestSchemaVersion", harvestSchemaVersion)
.attribute("pendingRowCount", pendingRowCount)
.attribute("targetRowCount", targetRowCount);
public ResultVectorCache vectorCache() {
return rootState.vectorCache();
public int rowIndex() {
return writerIndex().vectorIndex();
public ColumnBuilder columnBuilder() { return columnBuilder; }
public CustomErrorContext context() { return options.errorContext; }