blob: 393fc2321f10b5929691511e6d857356f53f981a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3;
import java.util.Collections;
import java.util.List;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.protocol.OperatorDriver;
import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanEventListener;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanLifecycle;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
/**
* Gathers options for the {@link ScanLifecycle} then builds a scan lifecycle
* instance.
* <p>
* This framework is a bridge between operator logic and the scan
* internals. It gathers scan-specific options in a builder abstraction, then
* passes them on the scan lifecycle at the right time. By abstracting out this
* plumbing, a scan batch creator simply chooses the proper framework builder, passes
* config options, and implements the matching "managed reader" and factory. All details
* of setup, projection, and so on are handled by the framework and the components
* that the framework builds upon.
*
* <h4>Inputs</h4>
*
* At this basic level, a scan framework requires just a few simple inputs:
* <ul>
* <li>The options defined by the scan projection framework such as the
* projection list.</li>
* <li>A reader factory to create a reader for each of the files or blocks
* to be scanned. (Readers are expected to be created one-by-one as files
* are read.)</li>
* <li>The operator context which provides access to a memory allocator and
* other plumbing items.</li>
* </ul>
* <p>
* In practice, there are other options to fine tune behavior (provided schema,
* custom error context, various limits, etc.)
*/
public class ScanLifecycleBuilder {
public static final int MIN_BATCH_BYTE_SIZE = 256 * 1024;
public static final int MAX_BATCH_BYTE_SIZE = Integer.MAX_VALUE;
public static final int DEFAULT_BATCH_ROW_COUNT = 4096;
public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE;
public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
public static class DummyReaderFactory implements ReaderFactory<SchemaNegotiator> {
@Override
public boolean hasNext() { return false; }
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return null;
}
}
public interface SchemaValidator {
void validate(ScanSchemaTracker schema);
}
private OptionSet options;
private ReaderFactory<?> readerFactory;
protected String userName;
protected MajorType nullType;
private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
protected boolean allowRequiredNullColumns;
private List<SchemaPath> projection;
protected TupleMetadata definedSchema;
protected TupleMetadata providedSchema;
/**
* Option that enables whether the scan operator starts with an empty
* schema-only batch (the so-called "fast schema" that Drill once tried
* to provide) or starts with a non-empty data batch (which appears to
* be the standard since the "Empty Batches" project some time back.)
* See more details in {@link OperatorDriver} Javadoc.
* <p>
* Defaults to <tt>false</tt>, meaning to <i>not</i> provide the empty
* schema batch. DRILL-7305 explains that many operators fail when
* presented with an empty batch, so do not enable this feature until
* those issues are fixed. Of course, do enable the feature if you want
* to track down the DRILL-7305 bugs.
*/
protected boolean enableSchemaBatch;
/**
* Option to disable empty results. An empty result occurs if no
* reader has any data, but at least one reader can provide a schema.
* In this case, the scan can return a single, empty batch, with
* an associated schema. This is the correct SQL result for an
* empty query. However, if this result triggers empty-batch bugs
* in other operators, we can, instead, disable this feature and
* return a null result set: no schema, no batch, just a "fast NONE",
* an immediate return of NONE from the Volcano iterator.
* <p>
* Disabling this option is not desirable: it means that the user
* gets no schema for queries that should be able to return one. So,
* disable this option only if we cannot find or fix empty-batch
* bugs.
*/
protected boolean disableEmptyResults;
/**
* Option to disable schema changes. If {@code false}, then the first
* batch commits the scan to a single, unchanged schema. If {@code true}
* (the legacy default), then each batch or reader can change the schema,
* even though downstream operators generally cannot handle a schema
* change. The goal is to evolve all readers so that they do not
* generate schema changes.
*/
protected boolean allowSchemaChange = true;
/**
* Optional schema validator to perform per-scan checks of the
* projection or resolved schema.
*/
protected SchemaValidator schemaValidator;
/**
* Context for error messages.
*/
protected CustomErrorContext errorContext;
public void options(OptionSet options) {
this.options = options;
}
public OptionSet options() {
return options;
}
public void readerFactory(ReaderFactory<?> readerFactory) {
this.readerFactory = readerFactory;
}
public void userName(String userName) {
this.userName = userName;
}
public String userName() {
return userName;
}
/**
* Specify a custom batch record count. This is the maximum number of records
* per batch for this scan. Readers can adjust this, but the adjustment is capped
* at the value specified here
*
* @param batchRecordLimit maximum records per batch
*/
public void batchRecordLimit(int batchRecordLimit) {
scanBatchRecordLimit = Math.max(1,
Math.min(batchRecordLimit, ValueVector.MAX_ROW_COUNT));
}
public void batchByteLimit(int byteLimit) {
scanBatchByteLimit = Math.max(MIN_BATCH_BYTE_SIZE,
Math.min(byteLimit, MAX_BATCH_BYTE_SIZE));
}
/**
* Specify the type to use for null columns in place of the standard
* nullable int. This type is used for all missing columns. (Readers
* that need per-column control need a different mechanism.)
*
* @param nullType the type to use for null columns
*/
public void nullType(MajorType nullType) {
this.nullType = nullType;
}
public void allowRequiredNullColumns(boolean flag) {
allowRequiredNullColumns = flag;
}
public boolean allowRequiredNullColumns() {
return allowRequiredNullColumns;
}
public void allowSchemaChange(boolean flag) {
allowSchemaChange = flag;
}
public boolean allowSchemaChange() {
return allowSchemaChange;
}
public void projection(List<SchemaPath> projection) {
this.projection = projection;
}
public void enableSchemaBatch(boolean option) {
enableSchemaBatch = option;
}
public void disableEmptyResults(boolean option) {
disableEmptyResults = option;
}
public void definedSchema(TupleMetadata definedSchema) {
this.definedSchema = definedSchema;
}
public TupleMetadata definedSchema() {
return definedSchema;
}
public void providedSchema(TupleMetadata providedSchema) {
this.providedSchema = providedSchema;
}
public TupleMetadata providedSchema() {
return providedSchema;
}
public void errorContext(CustomErrorContext context) {
this.errorContext = context;
}
public CustomErrorContext errorContext() {
if (errorContext == null) {
errorContext = new EmptyErrorContext();
}
return errorContext;
}
public List<SchemaPath> projection() {
if (projection == null) {
projection = Collections.singletonList(SchemaPath.STAR_COLUMN);
}
return projection;
}
public int scanBatchRecordLimit() {
return Math.max(1, Math.min(scanBatchRecordLimit, MAX_BATCH_ROW_COUNT));
}
public int scanBatchByteLimit() {
return Math.max(MIN_BATCH_BYTE_SIZE, Math.min(scanBatchByteLimit, MAX_BATCH_BYTE_SIZE));
}
public MajorType nullType() {
return nullType;
}
public ReaderFactory<?> readerFactory() {
if (readerFactory == null) {
readerFactory = new DummyReaderFactory();
}
return readerFactory;
}
public void schemaValidator(SchemaValidator schemaValidator) {
this.schemaValidator = schemaValidator;
}
public SchemaValidator schemaValidator() { return schemaValidator; }
public ScanLifecycle build(OperatorContext context) {
return new ScanLifecycle(context, this);
}
@VisibleForTesting
public ScanOperatorExec buildScan() {
return new ScanOperatorExec(
new ScanEventListener(this),
!disableEmptyResults);
}
public OperatorRecordBatch buildScanOperator(FragmentContext fragContext, PhysicalOperator pop) {
return new OperatorRecordBatch(fragContext, pop, buildScan(), enableSchemaBatch);
}
}