blob: cfa0b04eb444044dcb6e3bdbfea0c52cf1604f6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet.impl;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.WriterEvents;
import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents the write-time state for a column including the writer and the (optional)
* backing vector. Implements per-column operations such as vector overflow. If a column
* is a (possibly repeated) map, then the column state will hold a tuple state.
* <p>
* If a column is not projected, then the writer exists (to make life easier for the
* reader), but there will be no vector backing the writer.
* <p>
* Different columns need different kinds of vectors: a data vector, possibly an offset
* vector, or even a non-existent vector. The {@link VectorState} class abstracts out
* these differences.
*/
public abstract class ColumnState {
private static final Logger logger = LoggerFactory.getLogger(ColumnState.class);
/**
* Primitive (non-map) column state. Handles all three cardinalities.
* Column metadata is hosted on the writer.
*/
public static class PrimitiveColumnState extends ColumnState implements ColumnWriterListener {
public PrimitiveColumnState(LoaderInternals loader,
AbstractObjectWriter colWriter,
VectorState vectorState) {
super(loader, colWriter, vectorState);
WriterEvents scalarEvents;
if (colWriter.type() == ObjectType.ARRAY) {
scalarEvents = writer.array().entry().events();
} else {
scalarEvents = writer.events();
}
scalarEvents.bindListener(this);
}
@Override
public boolean canExpand(ScalarWriter writer, int delta) {
return loader.canExpand(delta);
}
@Override
public void overflowed(ScalarWriter writer) {
loader.overflowed();
}
/**
* Get the output schema. For a primitive (non-structured) column,
* the output schema is the same as the internal schema.
*/
@Override
public ColumnMetadata outputSchema() { return schema(); }
@Override
public void dump(HierarchicalFormatter format) {
// TODO Auto-generated method stub
}
}
public static abstract class BaseContainerColumnState extends ColumnState {
public BaseContainerColumnState(LoaderInternals loader,
AbstractObjectWriter writer, VectorState vectorState) {
super(loader, writer, vectorState);
}
public abstract ContainerState container();
@Override
public void updateCardinality(int cardinality) {
super.updateCardinality(cardinality);
container().updateCardinality();
}
@Override
public void startBatch(boolean schemaOnly) {
super.startBatch(schemaOnly);
container().startBatch(schemaOnly);
}
@Override
public void rollover() {
super.rollover();
container().rollover();
}
@Override
public void harvestWithLookAhead() {
super.harvestWithLookAhead();
container().harvestWithLookAhead();
}
@Override
public void close() {
super.close();
container().close();
}
}
/**
* Columns move through various lifecycle states as identified by this
* enum. (Yes, sorry that the term "state" is used in two different ways
* here: the variables for a column and the point within the column
* lifecycle.
*/
protected enum State {
/**
* Column is in the normal state of writing with no overflow
* in effect.
*/
NORMAL,
/**
* Like NORMAL, but means that the data has overflowed and the
* column's data for the current row appears in the new,
* overflow batch. For a client that omits some columns, written
* columns will be in OVERFLOW state, unwritten columns in
* NORMAL state.
*/
OVERFLOW,
/**
* Indicates that the column has data saved
* in the overflow batch.
*/
LOOK_AHEAD,
/**
* Like LOOK_AHEAD, but indicates the special case that the column
* was added after overflow, so there is no vector for the column
* in the harvested batch.
*/
NEW_LOOK_AHEAD
}
protected final LoaderInternals loader;
protected final int addVersion;
protected final VectorState vectorState;
protected State state;
protected AbstractObjectWriter writer;
/**
* Cardinality of the value itself. If this is an array,
* then this is the number of arrays. A separate number,
* the inner cardinality, is computed as the outer cardinality
* times the expected array count (from metadata.) The inner
* cardinality is the total number of array items in the
* vector.
*/
protected int cardinality;
protected int outputIndex = -1;
public ColumnState(LoaderInternals loader,
AbstractObjectWriter writer, VectorState vectorState) {
this.loader = loader;
this.vectorState = vectorState;
this.addVersion = writer.isProjected() ?
loader.bumpVersion() : loader.activeSchemaVersion();
this.state = loader.hasOverflow() ? State.NEW_LOOK_AHEAD : State.NORMAL;
this.writer = writer;
}
public AbstractObjectWriter writer() { return writer; }
public ColumnMetadata schema() { return writer.schema(); }
public VectorState vectorState() { return vectorState; }
public <T extends ValueVector> T vector() { return vectorState.vector(); }
public void allocateVectors() {
assert cardinality != 0;
loader.tallyAllocations(vectorState.allocate(cardinality));
}
/**
* Prepare the column for a new row batch after overflow on the previous
* batch. Restore the look-ahead buffer to the
* active vector so we start writing where we left off.
*/
public void startBatch(boolean schemaOnly) {
switch (state) {
case NORMAL:
if (! schemaOnly) {
allocateVectors();
}
break;
case NEW_LOOK_AHEAD:
// Column is new, was not exchanged with backup vector
break;
case LOOK_AHEAD:
// Restore the look-ahead values to the main vector.
vectorState.startBatchWithLookAhead();
break;
default:
throw new IllegalStateException("Unexpected state: " + state);
}
// In all cases, we are back to normal writing.
state = State.NORMAL;
}
/**
* A column within the row batch overflowed. Prepare to absorb the rest of the
* in-flight row by rolling values over to a new vector, saving the complete
* vector for later. This column could have a value for the overflow row, or
* for some previous row, depending on exactly when and where the overflow
* occurs.
*/
public void rollover() {
assert state == State.NORMAL;
// If the source index is 0, then we could not fit this one
// value in the original vector. Nothing will be accomplished by
// trying again with an an overflow vector. Just fail.
//
// Note that this is a judgment call. It is possible to allow the
// vector to double beyond the limit, but that will require a bit
// of thought to get right -- and, of course, completely defeats
// the purpose of limiting vector size to avoid memory fragmentation...
if (loader.rowIndex() == 0) {
throw UserException
.memoryError("A single column value is larger than the maximum allowed size of 16 MB")
.build(logger);
}
// Otherwise, do the roll-over to a look-ahead vector.
vectorState.rollover(cardinality);
// Remember that we did this overflow processing.
state = State.OVERFLOW;
}
/**
* Writing of a row batch is complete. Prepare the vector for harvesting
* to send downstream. If this batch encountered overflow, set aside the
* look-ahead vector and put the full vector buffer back into the active
* vector.
*/
public void harvestWithLookAhead() {
switch (state) {
case NEW_LOOK_AHEAD:
// If added after overflow, no data to save from the complete
// batch: the vector does not appear in the completed batch.
break;
case OVERFLOW:
// Otherwise, restore the original, full buffer and
// last write position.
vectorState.harvestWithLookAhead();
// Remember that we have look-ahead values stashed away in the
// backup vector.
state = State.LOOK_AHEAD;
break;
default:
throw new IllegalStateException("Unexpected state: " + state);
}
}
public boolean isProjected() {
return vectorState.isProjected();
}
public void updateCardinality(int cardinality) {
this.cardinality = cardinality;
}
public int outerCardinality() { return cardinality; }
public int innerCardinality() {
ColumnMetadata schema = schema();
if (schema.isArray()) {
// Multiply out the cardinality, but place reasonable limits:
// at least one row per inner array, limit by max row count to
// prevent cardinality explosions or deeply nested arrays.
int elementCount = Math.max(1, schema.expectedElementCount());
return Math.min(ValueVector.MAX_ROW_COUNT, cardinality * elementCount);
} else {
return cardinality;
}
}
public void buildOutput(TupleState tupleState) {
outputIndex = tupleState.addOutputColumn(vector(), outputSchema());
}
public abstract ColumnMetadata outputSchema();
public void close() {
vectorState.close();
}
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
.attribute("addVersion", addVersion)
.attribute("state", state)
.attributeIdentity("writer", writer)
.attribute("vectorState");
vectorState.dump(format);
format.endObject();
}
}