| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.impl.scan.v3.lifecycle; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.drill.exec.expr.BasicTypeHelper; |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; |
| import org.apache.drill.exec.record.VectorContainer; |
| import org.apache.drill.exec.record.metadata.ColumnMetadata; |
| import org.apache.drill.exec.record.metadata.TupleMetadata; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.drill.exec.vector.complex.AbstractMapVector; |
| import org.apache.drill.exec.vector.complex.MapVector; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Builds an output batch based on an output schema and one or more input |
| * schemas. The input schemas must represent disjoint subsets of the output |
| * schema. |
| * <p> |
| * Handles maps, which can overlap at the map level (two inputs can hold a map |
| * column named {@code `m`}, say), but the map members must be disjoint. Applies |
| * the same rule recursively to nested maps. |
| * <p> |
| * Maps must be built with members in the same order as the corresponding |
| * schema. Though maps are usually thought of as unordered name/value pairs, |
| * they are actually tuples, with both a name and a defined ordering. |
| * <p> |
| * This code uses a name lookup in maps because the semantics of maps do not |
| * guarantee a uniform numbering of members from {@code 0} to {@code n-1}, where |
| * {code n} is the number of map members. Map members are ordered, but the |
| * ordinal used by the map vector is not necessarily sequential. |
| * <p> |
| * Once the output container is built, the same value vectors reside in the |
| * input and output containers. This works because Drill requires vector |
| * persistence: the same vectors must be presented downstream in every batch |
| * until a schema change occurs. |
| * |
| * <h4>Projection</h4> |
| * |
| * To visualize projection, assume we have numbered table columns, lettered |
| * implicit, null or partition columns:<pre><code> |
| * [ 1 | 2 | 3 | 4 ] Table columns in table order |
| * [ A | B | C ] Static columns |
| * </code></pre> |
| * Now, we wish to project them into select order. |
| * Let's say that the SELECT clause looked like this, with "t" |
| * indicating table columns:<pre><code> |
| * SELECT t2, t3, C, B, t1, A, t2 ... |
| * </code></pre> |
| * Then the projection looks like this:<pre><code> |
| * [ 2 | 3 | C | B | 1 | A | 2 ] |
| * </code></pre> |
| * Often, not all table columns are projected. In this case, the |
| * result set loader presents the full table schema to the reader, |
| * but actually writes only the projected columns. Suppose we |
| * have:<pre><code> |
| * SELECT t3, C, B, t1, A ... |
| * </code></pre> |
| * Then the abbreviated table schema looks like this:<pre><code> |
| * [ 1 | 3 ]</code></pre> |
| * Note that table columns retain their table ordering. |
| * The projection looks like this:<pre><code> |
| * [ 2 | C | B | 1 | A ] |
| * </code></pre> |
| * <p> |
| * The projector is created once per schema, then can be reused for any |
| * number of batches. |
| * <p> |
| * Merging is done in one of two ways, depending on the input source: |
| * <ul> |
| * <li>For the table loader, the merger discards any data in the output, |
| * then exchanges the buffers from the input columns to the output, |
| * leaving projected columns empty. Note that unprojected columns must |
| * be cleared by the caller.</li> |
| * <li>For implicit and null columns, the output vector is identical |
| * to the input vector.</li> |
| * </ul> |
| */ |
| public class OutputBatchBuilder { |
| |
| /** |
| * Describes an input batch with a schema and a vector container. |
| */ |
| public static class BatchSource { |
| private final TupleMetadata schema; |
| private final VectorContainer container; |
| |
| public BatchSource(TupleMetadata schema, VectorContainer container) { |
| this.schema = schema; |
| this.container = container; |
| } |
| } |
| |
| /** |
| * Describes a vector source: an index to a batch source (or nested map |
| * source), and an offset into that source. (Actually, an offset into the |
| * source schema which is the same as the container offset for the top-level |
| * row, but which translates to a name lookup for maps.) |
| */ |
| private static class VectorSource { |
| protected final int source; |
| protected final int offset; |
| |
| public VectorSource(int source, int offset) { |
| this.source = source; |
| this.offset = offset; |
| } |
| |
| @Override |
| public String toString() { |
| return "[source=" + source + |
| ", offset=" + offset + "]"; |
| } |
| } |
| |
| /** |
| * Source map as a map schema and map vector. |
| */ |
| public static class MapSource { |
| protected final TupleMetadata mapSchema; |
| protected final AbstractMapVector mapVector; |
| |
| public MapSource(TupleMetadata mapSchema, AbstractMapVector mapVector) { |
| this.mapSchema = mapSchema; |
| this.mapVector = mapVector; |
| } |
| } |
| |
| /** |
| * Construct a map from an output schema and a set of input maps. The logic |
| * is very similar to that for a batch, but just different enough that we need |
| * a separate implementation. |
| */ |
| private static class MapBuilder { |
| private final ColumnMetadata outputCol; |
| private final TupleMetadata mapSchema; |
| private final List<MapSource> sourceMaps; |
| private final Object memberSources[]; |
| private final List<MapVector> mapVectors = new ArrayList<>(); |
| |
| private MapBuilder(ColumnMetadata outputCol, List<MapSource> sourceMaps) { |
| this.outputCol = outputCol; |
| this.mapSchema = outputCol.tupleSchema(); |
| this.sourceMaps = sourceMaps; |
| this.memberSources = new Object[mapSchema.size()]; |
| for (int i = 0; i < sourceMaps.size(); i++) { |
| defineSourceMapMapping(sourceMaps.get(i).mapSchema, i); |
| } |
| } |
| |
| /** |
| * Define the mapping for one of the sources. Mappings are |
| * stored in output order as a set of (source, offset) pairs. |
| */ |
| @SuppressWarnings("unchecked") |
| private void defineSourceMapMapping(TupleMetadata sourceSchema, int source) { |
| for (int i = 0; i < sourceSchema.size(); i++) { |
| ColumnMetadata col = sourceSchema.metadata(i); |
| int outputIndex = mapSchema.index(col.name()); |
| Preconditions.checkState(outputIndex >= 0); |
| VectorSource vectorSource = new VectorSource(source, i); |
| if (col.isMap()) { |
| if (memberSources[outputIndex] == null) { |
| memberSources[outputIndex] = new ArrayList<VectorSource>(); |
| } |
| ((List<VectorSource>) memberSources[outputIndex]).add(vectorSource); |
| } else { |
| assert memberSources[outputIndex] == null; |
| memberSources[outputIndex] = vectorSource; |
| } |
| } |
| } |
| |
| /** |
| * Creates a new output map vector to hold the merger of member |
| * vectors from the source maps. |
| */ |
| @SuppressWarnings("unchecked") |
| public AbstractMapVector build(BufferAllocator allocator) { |
| AbstractMapVector mapVector = (AbstractMapVector) |
| BasicTypeHelper.getNewVector(outputCol.name(), |
| allocator, outputCol.majorType(), null); |
| for (int i = 0; i < mapSchema.size(); i++) { |
| ColumnMetadata outputCol = mapSchema.metadata(i); |
| ValueVector outputVector; |
| if (outputCol.isMap()) { |
| outputVector = buildNestedMap(allocator, outputCol, (List<VectorSource>) memberSources[i]); |
| } else { |
| outputVector = getMember((VectorSource) memberSources[i]); |
| } |
| mapVector.putChild(outputCol.name(), outputVector); |
| } |
| if (mapVector instanceof MapVector) { |
| mapVectors.add((MapVector) mapVector); |
| } |
| return mapVector; |
| } |
| |
| private ValueVector buildNestedMap(BufferAllocator allocator, ColumnMetadata outputCol, |
| List<VectorSource> vectorSources) { |
| List<MapSource> childMaps = new ArrayList<>(); |
| for (VectorSource source : vectorSources) { |
| childMaps.add( |
| new MapSource( |
| sourceMaps.get(source.source).mapSchema |
| .metadata(source.offset).tupleSchema(), |
| (AbstractMapVector) getMember(source))); |
| } |
| MapBuilder builder = new MapBuilder(outputCol, childMaps); |
| ValueVector vector = builder.build(allocator); |
| mapVectors.addAll(builder.mapVectors); |
| return vector; |
| } |
| |
| public ValueVector getMember(VectorSource source) { |
| MapSource sourceMap = sourceMaps.get(source.source); |
| ColumnMetadata sourceCol = sourceMap.mapSchema.metadata(source.offset); |
| return sourceMap.mapVector.getChild(sourceCol.name()); |
| } |
| } |
| |
| private final TupleMetadata outputSchema; |
| private final List<BatchSource> sources; |
| private final Object vectorSources[]; |
| private final VectorContainer outputContainer; |
| private final List<MapVector> mapVectors = new ArrayList<>(); |
| |
| public OutputBatchBuilder(TupleMetadata outputSchema, List<BatchSource> sources, |
| BufferAllocator allocator) { |
| this.outputSchema = outputSchema; |
| this.sources = sources; |
| this.outputContainer = new VectorContainer(allocator); |
| this.vectorSources = new Object[outputSchema.size()]; |
| for (int i = 0; i < sources.size(); i++) { |
| defineSourceBatchMapping(sources.get(i).schema, i); |
| } |
| physicalProjection(); |
| } |
| |
| /** |
| * Define the mapping for one of the sources. Mappings are |
| * stored in output order as a set of (source, offset) pairs. |
| */ |
| @SuppressWarnings("unchecked") |
| protected void defineSourceBatchMapping(TupleMetadata schema, int source) { |
| for (int i = 0; i < schema.size(); i++) { |
| ColumnMetadata col = schema.metadata(i); |
| int outputIndex = outputSchema.index(col.name()); |
| Preconditions.checkState(outputIndex >= 0); |
| VectorSource vectorSource = new VectorSource(source, i); |
| |
| // If the column is a regular map, then projection may select |
| // some but not all columns. If the columns is a map array, then |
| // SQL projection cannot identify which columns are wanted. Include |
| // the entire map. |
| if (col.isMap() && !col.isArray()) { |
| if (vectorSources[outputIndex] == null) { |
| vectorSources[outputIndex] = new ArrayList<VectorSource>(); |
| } |
| ((List<VectorSource>) vectorSources[outputIndex]).add(vectorSource); |
| } else { |
| assert vectorSources[outputIndex] == null; |
| vectorSources[outputIndex] = vectorSource; |
| } |
| } |
| } |
| |
| /** |
| * Project the source vectors to the output container, merging |
| * maps along the way. |
| */ |
| @SuppressWarnings("unchecked") |
| private void physicalProjection() { |
| outputContainer.removeAll(); |
| mapVectors.clear(); |
| for (int i = 0; i < outputSchema.size(); i++) { |
| ColumnMetadata outputCol = outputSchema.metadata(i); |
| ValueVector outputVector; |
| if (outputCol.isMap() && !outputCol.isArray()) { |
| outputVector = buildTopMap(outputCol, (List<VectorSource>) vectorSources[i]); |
| } else { |
| outputVector = getVector((VectorSource) vectorSources[i]); |
| } |
| outputContainer.add(outputVector); |
| } |
| outputContainer.buildSchema(SelectionVectorMode.NONE); |
| } |
| |
| private ValueVector buildTopMap(ColumnMetadata outputCol, |
| List<VectorSource> vectorSources) { |
| List<MapSource> sourceMaps = new ArrayList<>(); |
| for (VectorSource source : vectorSources) { |
| sourceMaps.add( |
| new MapSource( |
| sources.get(source.source).schema.metadata(source.offset).tupleSchema(), |
| (AbstractMapVector) getVector(source))); |
| } |
| MapBuilder builder = new MapBuilder(outputCol, sourceMaps); |
| ValueVector vector = builder.build(outputContainer.getAllocator()); |
| // Map vectors are a nuisance: they carry their own value count which |
| // must be set separately from the underling data vectors. |
| mapVectors.addAll(builder.mapVectors); |
| return vector; |
| } |
| |
| public ValueVector getVector(VectorSource source) { |
| return sources.get(source.source).container |
| .getValueVector(source.offset).getValueVector(); |
| } |
| |
| public void load(int rowCount) { |
| outputContainer.setRecordCount(rowCount); |
| for (MapVector v : mapVectors) { |
| v.setMapValueCount(rowCount); |
| } |
| } |
| |
| public VectorContainer outputContainer() { return outputContainer; } |
| |
| /** |
| * Release per-reader resources. Does not release the actual value |
| * vectors as those reside in a cache. |
| */ |
| public void close() { |
| outputContainer.removeAll(); |
| } |
| } |