blob: 9d8f798c59d4d264f69f209f57fa7752ce3746b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.drill.exec.physical.resultSet.ProjectionSet;
import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
import org.apache.drill.exec.physical.resultSet.impl.ColumnState.BaseContainerColumnState;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
/**
* Represents the loader state for a tuple: a row or a map. This is "state" in
* the sense of variables that are carried along with each tuple. Handles
* write-time issues such as defining new columns, allocating memory, handling
* overflow, assembling the output version of the map, and so on. Each
* row and map in the result set has a tuple state instances associated
* with it.
* <p>
* Here, by "tuple" we mean a container of vectors, each of which holds
* a variety of values. So, the "tuple" here is structural, not a specific
* set of values, but rather the collection of vectors that hold tuple
* values.
*
* Drill vector containers and maps are both tuples, but they irritatingly
* have completely different APIs for working with their child vectors.
* These classes are a proxy to wrap the two APIs to provide a common
* view for the use the result set builder and its internals.
*
* <h4>Output Container</h4>
*
* Builds the harvest vector container that includes only the columns that
* are included in the harvest schema version. That is, it excludes columns
* added while writing an overflow row.
* <p>
* Because a Drill row is actually a hierarchy, walks the internal hierarchy
* and builds a corresponding output hierarchy.
* <ul>
* <li>The root node is the row itself (vector container),</li>
* <li>Internal nodes are maps (structures),</li>
* <li>Leaf notes are primitive vectors (which may be arrays).</li>
* </ul>
* The basic algorithm is to identify the version of the output schema,
* then add any new columns added up to that version. This object maintains
* the output container across batches, meaning that updates are incremental:
* we need only add columns that are new since the last update. And, those new
* columns will always appear directly after all existing columns in the row
* or in a map.
* <p>
* As special case occurs when columns are added in the overflow row. These
* columns <i>do not</i> appear in the output container for the main part
* of the batch; instead they appear in the <i>next</i> output container
* that includes the overflow row.
* <p>
* Since the container here may contain a subset of the internal columns, an
* interesting case occurs for maps. The maps in the output container are
* <b>not</b> the same as those used internally. Since a map column can contain
* either one list of columns or another, the internal and external maps must
* differ. The set of child vectors (except for child maps) are shared.
*/
public abstract class TupleState extends ContainerState
implements AbstractTupleWriter.TupleWriterListener {
/**
* Represents a map column (either single or repeated). Includes maps that
* are top-level, nested within other maps, or nested inside a union.
* Schema management is a bit complex:
* <table border=1>
* <tr><th rowspan=2>Condition</th><th colspan=2>Action</th></tr>
* <tr><th>Outside of Union</th><th>Inside of Union<th></tr>
* <tr><td>Unprojected</td><td>N/A</td><td>Omitted from output</td></tr>
* <tr><td>Added in prior batch</td><td colspan=2>Included in output</td></tr>
* <tr><td>Added in present batch, before overflow</td>
* <td colspan=2>Included in output</td></tr>
* <tr><td>Added in present batch, after overflow</td>
* <td>Omitted from output this batch (added next batch)</td>
* <td>Included in output</td></tr>
* </table>
* <p>
* The above rules say that, for maps in a union, the output schema
* is identical to the internal writer schema. But, for maps outside
* of union, the output schema is a subset of the internal schema with
* two types of omissions:
* <ul>
* <li>Unprojected columns</li>
* <li>Columns added after overflow</li>
* </ul
* <p>
* New columns can be added at any time for data readers that discover
* their schema as data is read (such as JSON). In this case, new columns
* always appear at the end of the map (remember, in Drill, a "map" is actually
* a structured: an ordered, named list of columns.) When looking for newly
* added columns, they will always be at the end.
*/
public static class MapColumnState extends BaseContainerColumnState {
protected final MapState mapState;
protected boolean isVersioned;
protected final ColumnMetadata outputSchema;
public MapColumnState(MapState mapState,
AbstractObjectWriter writer,
VectorState vectorState,
boolean isVersioned) {
super(mapState.loader(), writer, vectorState);
this.mapState = mapState;
mapState.bindColumnState(this);
this.isVersioned = isVersioned;
if (isVersioned) {
outputSchema = schema().cloneEmpty();
} else {
outputSchema = schema();
}
mapState.bindOutputSchema(outputSchema.mapSchema());
}
public MapState mapState() { return mapState; }
@Override
public ContainerState container() { return mapState; }
@Override
public boolean isProjected() {
return mapState.hasProjections();
}
/**
* Indicate if this map is versioned. A versionable map has three attributes:
* <ol>
* <li>Columns can be unprojected. (Columns appear as writers for the client
* of the result set loader, but are not materialized and do not appear in
* the projected output container.</li>
* <li>Columns appear in the output only if added before the overflow row.</li>
* <li>As a result, the output schema is a subset of the internal input
* schema.</li>
* </ul>
* @return <tt>true</tt> if this map is versioned as described above
*/
public boolean isVersioned() { return isVersioned; }
@Override
public ColumnMetadata outputSchema() { return outputSchema; }
}
/**
* State for a map vector. If the map is repeated, it will have an offset
* vector. The map vector itself is a pseudo-vector that is simply a
* container for other vectors, and so needs no management itself.
*/
public static class MapVectorState implements VectorState {
private final AbstractMapVector mapVector;
private final VectorState offsets;
public MapVectorState(AbstractMapVector mapVector, VectorState offsets) {
this.mapVector = mapVector;
this.offsets = offsets;
}
@Override
public int allocate(int cardinality) {
// The mapVector is a pseudo-vector; nothing to allocate.
return offsets.allocate(cardinality);
}
@Override
public void rollover(int cardinality) {
offsets.rollover(cardinality);
}
@Override
public void harvestWithLookAhead() {
offsets.harvestWithLookAhead();
}
@Override
public void startBatchWithLookAhead() {
offsets.harvestWithLookAhead();
}
@Override
public void close() {
offsets.close();
}
@SuppressWarnings("unchecked")
@Override
public AbstractMapVector vector() { return mapVector; }
public VectorState offsetVectorState() { return offsets; }
@Override
public boolean isProjected() {
return offsets.isProjected();
}
@Override
public void dump(HierarchicalFormatter format) {
// TODO
}
}
/**
* Handles the details of the top-level tuple, the data row itself.
* Note that by "row" we mean the set of vectors that define the
* set of rows.
*/
public static class RowState extends TupleState {
/**
* The row-level writer for stepping through rows as they are written,
* and for accessing top-level columns.
*/
private final RowSetLoaderImpl writer;
/**
* The projected set of columns presented to the consumer of the
* row set loader. Excludes non-projected columns presented to the
* consumer of the writers. Also excludes columns if added during
* an overflow row.
*/
private final VectorContainer outputContainer;
public RowState(ResultSetLoaderImpl rsLoader, ResultVectorCache vectorCache) {
super(rsLoader, vectorCache, rsLoader.projectionSet);
writer = new RowSetLoaderImpl(rsLoader, schema);
writer.bindListener(this);
outputContainer = new VectorContainer(rsLoader.allocator());
outputSchema = new TupleSchema();
}
public RowSetLoaderImpl rootWriter() { return writer; }
@Override
public AbstractTupleWriter writer() { return writer; }
@Override
public int innerCardinality() { return loader.targetRowCount();}
/**
* The row as a whole is versioned.
*
* @return <tt>true</tt>
*/
@Override
protected boolean isVersioned() { return true; }
@Override
protected void updateOutput(int curSchemaVersion) {
super.updateOutput(curSchemaVersion);
outputContainer.buildSchema(SelectionVectorMode.NONE);
}
@Override
public int addOutputColumn(ValueVector vector, ColumnMetadata colSchema) {
outputContainer.add(vector);
final int index = outputSchema.addColumn(colSchema);
assert outputContainer.getNumberOfColumns() == outputSchema.size();
return index;
}
public VectorContainer outputContainer() { return outputContainer; }
}
/**
* Represents a tuple defined as a Drill map: single or repeated. Note that
* the map vector does not exist here; it is assembled only when "harvesting"
* a batch. This design supports the obscure case in which a new column
* is added during an overflow row, so exists within this abstraction,
* but is not published to the map that makes up the output.
* <p>
* The map state is associated with a map vector. This vector is built
* either during harvest time (normal maps) or on the fly (union maps.)
*/
public static abstract class MapState extends TupleState {
public MapState(LoaderInternals events,
ResultVectorCache vectorCache,
ProjectionSet projectionSet) {
super(events, vectorCache, projectionSet);
}
public void bindColumnState(MapColumnState colState) {
super.bindColumnState(colState);
writer().bindListener(this);
}
@Override
public int addOutputColumn(ValueVector vector, ColumnMetadata colSchema) {
final AbstractMapVector mapVector = parentColumn.vector();
if (isVersioned()) {
mapVector.putChild(colSchema.name(), vector);
}
final int index = outputSchema.addColumn(colSchema);
assert mapVector.size() == outputSchema.size();
assert mapVector.getField().getChildren().size() == outputSchema.size();
return index;
}
@Override
protected void addColumn(ColumnState colState) {
super.addColumn(colState);
// If the map is materialized (because it is nested inside a union)
// then add the new vector to the map at add time. But, for top-level
// maps, or those nested inside other maps (but not a union or
// repeated list), defer
// adding the column until harvest time, to allow for the case that
// new columns are added in the overflow row. Such columns may be
// required, and not allow back-filling. But, inside unions, all
// columns must be nullable, so back-filling of nulls is possible.
if (! isVersioned()) {
final AbstractMapVector mapVector = parentColumn.vector();
mapVector.putChild(colState.schema().name(), colState.vector());
}
}
/**
* A map is within a union if the map vector has been materialized.
* Top-level maps are built at harvest time. But, due to the complexity
* of unions, maps within unions are materialized. This method ensures
* that maps are materialized regardless of nesting depth within
* a union.
*/
@Override
protected boolean isVersioned() {
return ((MapColumnState) parentColumn).isVersioned();
}
@Override
public int innerCardinality() {
return parentColumn.innerCardinality();
}
@Override
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
.attribute("column", parentColumn.schema().name())
.attribute("cardinality", innerCardinality())
.endObject();
}
}
public static class SingleMapState extends MapState {
public SingleMapState(LoaderInternals events,
ResultVectorCache vectorCache,
ProjectionSet projectionSet) {
super(events, vectorCache, projectionSet);
}
/**
* Return the tuple writer for the map. If this is a single
* map, then it is the writer itself. If this is a map array,
* then the tuple is nested inside the array.
*/
@Override
public AbstractTupleWriter writer() {
return (AbstractTupleWriter) parentColumn.writer().tuple();
}
}
public static class MapArrayState extends MapState {
public MapArrayState(LoaderInternals events,
ResultVectorCache vectorCache,
ProjectionSet projectionSet) {
super(events, vectorCache, projectionSet);
}
/**
* Return the tuple writer for the map. If this is a single
* map, then it is the writer itself. If this is a map array,
* then the tuple is nested inside the array.
*/
@Override
public AbstractTupleWriter writer() {
return (AbstractTupleWriter) parentColumn.writer().array().tuple();
}
}
/**
* The set of columns added via the writers: includes both projected
* and unprojected columns. (The writer is free to add columns that the
* query does not project; the result set loader creates a dummy column
* and dummy writer, then does not project the column to the output.)
*/
protected final List<ColumnState> columns = new ArrayList<>();
/**
* Internal writer schema that matches the column list.
*/
protected final TupleMetadata schema = new TupleSchema();
/**
* Metadata description of the output container (for the row) or map
* (for map or repeated map.)
* <p>
* Rows and maps have an output schema which may differ from the internal schema.
* The output schema excludes unprojected columns. It also excludes
* columns added in an overflow row.
* <p>
* The output schema is built slightly differently for maps inside a
* union vs. normal top-level (or nested) maps. Maps inside a union do
* not defer columns because of the muddy semantics (and infrequent use)
* of unions.
*/
protected TupleMetadata outputSchema;
private int prevHarvestIndex = -1;
protected TupleState(LoaderInternals events,
ResultVectorCache vectorCache,
ProjectionSet projectionSet) {
super(events, vectorCache, projectionSet);
}
protected void bindOutputSchema(TupleMetadata outputSchema) {
this.outputSchema = outputSchema;
}
/**
* Returns an ordered set of the columns which make up the tuple.
* Column order is the same as that defined by the map's schema,
* to allow indexed access. New columns always appear at the end
* of the list to preserve indexes.
*
* @return ordered list of column states for the columns within
* this tuple
*/
public List<ColumnState> columns() { return columns; }
public TupleMetadata schema() { return writer().tupleSchema(); }
public abstract AbstractTupleWriter writer();
@Override
public ObjectWriter addColumn(TupleWriter tupleWriter, MaterializedField column) {
return addColumn(tupleWriter, MetadataUtils.fromField(column));
}
@Override
public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) {
return BuildFromSchema.instance().buildColumn(this, columnSchema);
}
@Override
protected void addColumn(ColumnState colState) {
columns.add(colState);
}
public boolean hasProjections() {
for (final ColumnState colState : columns) {
if (colState.isProjected()) {
return true;
}
}
return false;
}
@Override
protected Collection<ColumnState> columnStates() {
return columns;
}
protected void updateOutput(int curSchemaVersion) {
// Scan all columns
for (int i = 0; i < columns.size(); i++) {
final ColumnState colState = columns.get(i);
// Ignore unprojected columns
if (! colState.writer().isProjected()) {
continue;
}
// If this is a new column added since the lastoutput, then we may have
// to add the column to this output. For the row itself, and for maps
// outside of unions, If the column wasadded after the output schema
// version cutoff, skip that column for now. But, if this tuple is
// within a union, then we always add all columns because union
// semantics are too muddy to play the deferred column game. Further,
// all columns in a map within a union must be nullable, so we know we
// can fill the column with nulls. (Something that is not true for
// normal maps.)
if (i > prevHarvestIndex && (! isVersioned() || colState.addVersion <= curSchemaVersion)) {
colState.buildOutput(this);
prevHarvestIndex = i;
}
// If the column is a map, then we have to recurse into the map
// itself. If the map is inside a union, then the map's vectors
// already appear in the map vector, but we still must update the
// output schema.
if (colState.schema().isMap()) {
final MapState childMap = ((MapColumnState) colState).mapState();
childMap.updateOutput(curSchemaVersion);
}
}
}
public abstract int addOutputColumn(ValueVector vector, ColumnMetadata colSchema);
public TupleMetadata outputSchema() { return outputSchema; }
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
.attributeArray("columns");
for (int i = 0; i < columns.size(); i++) {
format.element(i);
columns.get(i).dump(format);
}
format
.endArray()
.endObject();
}
}