| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.impl.scan.project; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.drill.common.exceptions.CustomErrorContext; |
| import org.apache.drill.common.expression.SchemaPath; |
| import org.apache.drill.common.types.TypeProtos.MajorType; |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.ops.FragmentContext; |
| import org.apache.drill.exec.physical.base.PhysicalOperator; |
| import org.apache.drill.exec.physical.impl.protocol.OperatorDriver; |
| import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch; |
| import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents; |
| import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec; |
| import org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver; |
| import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; |
| import org.apache.drill.exec.physical.impl.scan.project.projSet.TypeConverter; |
| import org.apache.drill.exec.physical.resultSet.impl.ResultVectorCacheImpl; |
| import org.apache.drill.exec.record.VectorContainer; |
| import org.apache.drill.exec.record.metadata.TupleMetadata; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Performs projection of a record reader, along with a set of static |
| * columns, to produce the final "public" result set (record batch) |
| * for the scan operator. Primarily solve the "vector permanence" |
| * problem: that the scan operator must present the same set of vectors |
| * to downstream operators despite the fact that the scan operator hosts |
| * a series of readers, each of which builds its own result set. |
| * <p> |
| * Provides the option to continue a schema from one batch to the next. |
| * This can reduce spurious schema changes in formats, such as JSON, with |
| * varying fields. It is not, however, a complete solution as the outcome |
| * still depends on the order of file scans and the division of files across |
| * readers. |
| * <p> |
| * Provides the option to infer the schema from the first batch. The "quick path" |
| * to obtain the schema will read one batch, then use that schema as the returned |
| * schema, returning the full batch in the next call to <tt>next()</tt>. |
| * |
| * <h4>Publishing the Final Result Set<h4> |
| * |
| * This class "publishes" a vector container that has the final, projected |
| * form of a scan. The projected schema include: |
| * <ul> |
| * <li>Columns from the reader.</li> |
| * <li>Static columns, such as implicit or partition columns.</li> |
| * <li>Null columns for items in the select list, but not found in either |
| * of the above two categories.</li> |
| * </ul> |
| * The order of columns is that set by the select list (or, by the reader for |
| * a <tt>SELECT *</tt> query. |
| * |
| * <h4>Schema Handling</h4> |
| * |
| * The mapping handles a variety of cases: |
| * <ul> |
| * <li>An early-schema table (one in which we know the schema and |
| * the schema remains constant for the whole table.</li> |
| * <li>A late schema table (one in which we discover the schema as |
| * we read the table, and where the schema can change as the read |
| * progresses.)<ul> |
| * <li>Late schema table with SELECT * (we want all columns, whatever |
| * they happen to be.)</li> |
| * <li>Late schema with explicit select list (we want only certain |
| * columns when they happen to appear in the input.)</li></ul></li> |
| * </ul> |
| * |
| * <h4>Implementation Overview</h4> |
| * |
| * Major tasks of this class include: |
| * <ul> |
| * <li>Project table columns (change position and or name).</li> |
| * <li>Insert static and null columns.</li> |
| * <li>Schema smoothing. That is, if table A produces columns (a, b), but |
| * table B produces only (a), use the type of the first table's b column for the |
| * null created for the missing b in table B.</li> |
| * <li>Vector persistence: use the same set of vectors across readers as long |
| * as the reader schema does not cause a "hard" schema change (change in type, |
| * introduction of a new column.</li> |
| * <li>Detection of schema changes (change of type, introduction of a new column |
| * for a <tt>SELECT *</tt> query, changing the projected schema, and reporting |
| * the change downstream.</li> |
| * </ul> |
| * A projection is needed to: |
| * <ul> |
| * <li>Reorder table columns</li> |
| * <li>Select a subset of table columns</li> |
| * <li>Fill in missing select columns</li> |
| * <li>Fill in implicit or partition columns</li> |
| * </ul> |
| * Creates and returns the batch merger that does the projection. |
| * |
| * <h4>Projection</h4> |
| * |
| * To visualize this, assume we have numbered table columns, lettered |
| * implicit, null or partition columns:<pre><code> |
| * [ 1 | 2 | 3 | 4 ] Table columns in table order |
| * [ A | B | C ] Static columns |
| * </code></pre> |
| * Now, we wish to project them into select order. |
| * Let's say that the SELECT clause looked like this, with "t" |
| * indicating table columns:<pre><code> |
| * SELECT t2, t3, C, B, t1, A, t2 ... |
| * </code></pre> |
| * Then the projection looks like this:<pre><code> |
| * [ 2 | 3 | C | B | 1 | A | 2 ] |
| * </code></pre> |
| * Often, not all table columns are projected. In this case, the |
| * result set loader presents the full table schema to the reader, |
| * but actually writes only the projected columns. Suppose we |
| * have:<pre><code> |
| * SELECT t3, C, B, t1,, A ... |
| * </code></pre> |
| * Then the abbreviated table schema looks like this:<pre><code> |
| * [ 1 | 3 ]</code></pre> |
| * Note that table columns retain their table ordering. |
| * The projection looks like this:<pre><code> |
| * [ 2 | C | B | 1 | A ] |
| * </code></pre> |
| * <p> |
| * The projector is created once per schema, then can be reused for any |
| * number of batches. |
| * <p> |
| * Merging is done in one of two ways, depending on the input source: |
| * <ul> |
| * <li>For the table loader, the merger discards any data in the output, |
| * then exchanges the buffers from the input columns to the output, |
| * leaving projected columns empty. Note that unprojected columns must |
| * be cleared by the caller.</li> |
| * <li>For implicit and null columns, the output vector is identical |
| * to the input vector.</li> |
| */ |
| |
| public class ScanSchemaOrchestrator { |
| |
| public static final int MIN_BATCH_BYTE_SIZE = 256 * 1024; |
| public static final int MAX_BATCH_BYTE_SIZE = Integer.MAX_VALUE; |
| public static final int DEFAULT_BATCH_ROW_COUNT = 4096; |
| public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE; |
| public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT; |
| |
| public abstract static class ScanOrchestratorBuilder { |
| |
| private MajorType nullType; |
| private MetadataManager metadataManager; |
| private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT; |
| private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT; |
| private List<ScanProjectionParser> parsers = new ArrayList<>(); |
| private List<ReaderProjectionResolver> schemaResolvers = new ArrayList<>(); |
| private boolean useSchemaSmoothing; |
| private boolean allowRequiredNullColumns; |
| private List<SchemaPath> projection; |
| private TypeConverter.Builder typeConverterBuilder = TypeConverter.builder(); |
| |
| /** |
| * Option that enables whether the scan operator starts with an empty |
| * schema-only batch (the so-called "fast schema" that Drill once tried |
| * to provide) or starts with a non-empty data batch (which appears to |
| * be the standard since the "Empty Batches" project some time back.) |
| * See more details in {@link OperatorDriver} Javadoc. |
| * <p> |
| * Defaults to <tt>false</tt>, meaning to <i>not</i> provide the empty |
| * schema batch. DRILL-7305 explains that many operators fail when |
| * presented with an empty batch, so do not enable this feature until |
| * those issues are fixed. Of course, do enable the feature if you want |
| * to track down the DRILL-7305 bugs. |
| */ |
| |
| private boolean enableSchemaBatch; |
| |
| /** |
| * Option to disable empty results. An empty result occurs if no |
| * reader has any data, but at least one reader can provide a schema. |
| * In this case, the scan can return a single, empty batch, with |
| * an associated schema. This is the correct SQL result for an |
| * empty query. However, if this result triggers empty-batch bugs |
| * in other operators, we can, instead, disable this feature and |
| * return a null result set: no schema, no batch, just a "fast NONE", |
| * an immediate return of NONE from the Volcano iterator. |
| * <p> |
| * Disabling this option is not desirable: it means that the user |
| * gets no schema for queries that should be able to return one. So, |
| * disable this option only if we cannot find or fix empty-batch |
| * bugs. |
| */ |
| |
| public boolean disableEmptyResults; |
| |
| /** |
| * Context for error messages. |
| */ |
| |
| private CustomErrorContext errorContext; |
| |
| /** |
| * Specify an optional metadata manager. Metadata is a set of constant |
| * columns with per-reader values. For file-based sources, this is usually |
| * the implicit and partition columns; but it could be other items for other |
| * data sources. |
| * |
| * @param metadataMgr the application-specific metadata manager to use |
| * for this scan |
| */ |
| |
| public void withMetadata(MetadataManager metadataMgr) { |
| metadataManager = metadataMgr; |
| schemaResolvers.add(metadataManager.resolver()); |
| } |
| |
| /** |
| * Specify a custom batch record count. This is the maximum number of records |
| * per batch for this scan. Readers can adjust this, but the adjustment is capped |
| * at the value specified here |
| * |
| * @param batchRecordLimit maximum records per batch |
| */ |
| |
| public void setBatchRecordLimit(int batchRecordLimit) { |
| scanBatchRecordLimit = Math.max(1, |
| Math.min(batchRecordLimit, ValueVector.MAX_ROW_COUNT)); |
| } |
| |
| public void setBatchByteLimit(int byteLimit) { |
| scanBatchByteLimit = Math.max(MIN_BATCH_BYTE_SIZE, |
| Math.min(byteLimit, MAX_BATCH_BYTE_SIZE)); |
| } |
| |
| /** |
| * Specify the type to use for null columns in place of the standard |
| * nullable int. This type is used for all missing columns. (Readers |
| * that need per-column control need a different mechanism.) |
| * |
| * @param nullType the type to use for null columns |
| */ |
| |
| public void setNullType(MajorType nullType) { |
| this.nullType = nullType; |
| } |
| |
| /** |
| * Enable schema smoothing: introduces an addition level of schema |
| * resolution each time a schema changes from a reader. |
| * |
| * @param flag true to enable schema smoothing, false to disable |
| */ |
| |
| public void enableSchemaSmoothing(boolean flag) { |
| useSchemaSmoothing = flag; |
| } |
| |
| public void allowRequiredNullColumns(boolean flag) { |
| allowRequiredNullColumns = flag; |
| } |
| |
| public void addParser(ScanProjectionParser parser) { |
| parsers.add(parser); |
| } |
| |
| public void addResolver(ReaderProjectionResolver resolver) { |
| schemaResolvers.add(resolver); |
| } |
| |
| public void setProjection(List<SchemaPath> projection) { |
| this.projection = projection; |
| } |
| |
| public void enableSchemaBatch(boolean option) { |
| enableSchemaBatch = option; |
| } |
| |
| public void disableEmptyResults(boolean option) { |
| disableEmptyResults = option; |
| } |
| |
| public TypeConverter.Builder typeConverterBuilder() { |
| return typeConverterBuilder; |
| } |
| |
| public void setContext(CustomErrorContext context) { |
| this.errorContext = context; |
| } |
| |
| public CustomErrorContext errorContext() { |
| return errorContext; |
| } |
| |
| @VisibleForTesting |
| public ScanOperatorExec buildScan() { |
| return new ScanOperatorExec(buildEvents(), |
| ! disableEmptyResults); |
| } |
| |
| public OperatorRecordBatch buildScanOperator(FragmentContext fragContext, PhysicalOperator pop) { |
| return new OperatorRecordBatch(fragContext, pop, buildScan(), enableSchemaBatch); |
| } |
| |
| public abstract ScanOperatorEvents buildEvents(); |
| } |
| |
| public static class ScanSchemaOptions { |
| |
| /** |
| * Custom null type, if provided by the operator. If |
| * not set, the null type is the Drill default. |
| */ |
| |
| public final MajorType nullType; |
| public final int scanBatchRecordLimit; |
| public final int scanBatchByteLimit; |
| public final List<ScanProjectionParser> parsers; |
| |
| /** |
| * List of resolvers used to resolve projection columns for each |
| * new schema. Allows operators to introduce custom functionality |
| * as a plug-in rather than by copying code or subclassing this |
| * mechanism. |
| */ |
| |
| public final List<ReaderProjectionResolver> schemaResolvers; |
| |
| public final List<SchemaPath> projection; |
| public final boolean useSchemaSmoothing; |
| public final boolean allowRequiredNullColumns; |
| public final TypeConverter typeConverter; |
| |
| /** |
| * Context for error messages. |
| */ |
| public final CustomErrorContext context; |
| |
| protected ScanSchemaOptions(ScanOrchestratorBuilder builder) { |
| nullType = builder.nullType; |
| scanBatchRecordLimit = builder.scanBatchRecordLimit; |
| scanBatchByteLimit = builder.scanBatchByteLimit; |
| parsers = builder.parsers; |
| schemaResolvers = builder.schemaResolvers; |
| projection = builder.projection; |
| useSchemaSmoothing = builder.useSchemaSmoothing; |
| context = builder.errorContext; |
| typeConverter = builder.typeConverterBuilder |
| .errorContext(builder.errorContext) |
| .build(); |
| allowRequiredNullColumns = builder.allowRequiredNullColumns; |
| } |
| |
| protected TupleMetadata outputSchema() { |
| return typeConverter == null ? null : typeConverter.providedSchema(); |
| } |
| } |
| |
| // Configuration |
| |
| protected final BufferAllocator allocator; |
| protected final ScanSchemaOptions options; |
| |
| /** |
| * Creates the metadata (file and directory) columns, if needed. |
| */ |
| |
| public final MetadataManager metadataManager; |
| |
| // Internal state |
| |
| /** |
| * Cache used to preserve the same vectors from one output batch to the |
| * next to keep the Project operator happy (which depends on exactly the |
| * same vectors. |
| * <p> |
| * If the Project operator ever changes so that it depends on looking up |
| * vectors rather than vector instances, this cache can be deprecated. |
| */ |
| |
| protected final ResultVectorCacheImpl vectorCache; |
| protected final ScanLevelProjection scanProj; |
| private ReaderSchemaOrchestrator currentReader; |
| protected final SchemaSmoother schemaSmoother; |
| |
| // Output |
| |
| protected VectorContainer outputContainer; |
| |
| public ScanSchemaOrchestrator(BufferAllocator allocator, ScanOrchestratorBuilder builder) { |
| this.allocator = allocator; |
| this.options = new ScanSchemaOptions(builder); |
| |
| vectorCache = new ResultVectorCacheImpl(allocator, options.useSchemaSmoothing); |
| |
| // If no metadata manager was provided, create a mock |
| // version just to keep code simple. |
| |
| if (builder.metadataManager == null) { |
| metadataManager = new NoOpMetadataManager(); |
| } else { |
| metadataManager = builder.metadataManager; |
| } |
| metadataManager.bind(vectorCache); |
| |
| // Bind metadata manager parser to scan projector. |
| // A "real" (non-mock) metadata manager will provide |
| // a projection parser. Use this to tell us that this |
| // setup supports metadata. |
| |
| ScanProjectionParser parser = metadataManager.projectionParser(); |
| if (parser != null) { |
| // Insert in first position so that it is ensured to see |
| // any wildcard that exists |
| options.parsers.add(0, parser); |
| } |
| |
| // Parse the projection list. |
| |
| scanProj = ScanLevelProjection.builder() |
| .projection(options.projection) |
| .parsers(options.parsers) |
| .outputSchema(options.outputSchema()) |
| .context(builder.errorContext()) |
| .build(); |
| if (scanProj.projectAll() && options.useSchemaSmoothing) { |
| schemaSmoother = new SchemaSmoother(scanProj, options.schemaResolvers); |
| } else { |
| schemaSmoother = null; |
| } |
| |
| // Build the output container. |
| |
| outputContainer = new VectorContainer(allocator); |
| } |
| |
| public ReaderSchemaOrchestrator startReader() { |
| closeReader(); |
| currentReader = new ReaderSchemaOrchestrator(this); |
| return currentReader; |
| } |
| |
| public boolean isProjectNone() { |
| return scanProj.isEmptyProjection(); |
| } |
| |
| public boolean hasSchema() { |
| return currentReader != null && currentReader.hasSchema(); |
| } |
| |
| public VectorContainer output() { |
| return outputContainer; |
| } |
| |
| public void closeReader() { |
| if (currentReader != null) { |
| currentReader.close(); |
| currentReader = null; |
| } |
| } |
| |
| public void close() { |
| closeReader(); |
| if (outputContainer != null) { |
| outputContainer.clear(); |
| outputContainer = null; |
| } |
| vectorCache.close(); |
| metadataManager.close(); |
| } |
| } |