blob: 8b1b402794d86c783d0b62e323ce42293ce0cbfa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.project;
import java.util.ArrayList;
import java.util.List;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedColumn;
import org.apache.drill.exec.physical.impl.scan.project.AbstractUnresolvedColumn.UnresolvedWildcardColumn;
import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
import org.apache.drill.exec.physical.resultSet.project.ImpliedTupleRequest;
import org.apache.drill.exec.physical.resultSet.project.Projections;
import org.apache.drill.exec.physical.resultSet.project.RequestedTuple;
import org.apache.drill.exec.physical.resultSet.project.RequestedColumn;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import com.google.common.annotations.VisibleForTesting;
/**
* Parses and analyzes the projection list passed to the scanner. The
* scanner accepts a projection list and a plugin-specific set of items
* to read. The scan operator produces a series of output batches, which
* (in the best case) all have the same schema. Since Drill is "schema
* on read", in practice batch schema may evolve. The framework tries
* to "smooth" such changes where possible. An output schema adds another
* level of stability by specifying the set of columns to project (for
* wildcard queries) and the types of those columns (for all queries.)
* <p>
* The projection list is per scan, independent of any tables that the
* scanner might scan. The projection list is then used as input to the
* per-table projection planning.
*
* <h4>Overview</h4>
*
* In most query engines, this kind of projection analysis is done at
* plan time. But, since Drill is schema-on-read, we don't know the
* available columns, or their types, until we start scanning a table.
* The table may provide the schema up-front, or may discover it as
* the read proceeds. Hence, the job here is to make sense of the
* project list based on static a-priori information, then to create
* a list that can be further resolved against an table schema when it
* appears. This give us two steps:
* <ul>
* <li>Scan-level projection: this class, that handles schema for the
* entire scan operator.</li>
* <li>Table-level projection: defined elsewhere, that merges the
* table and scan-level projections.
* </ul>
* <p>
* Accepts the inputs needed to plan a projection, builds the mappings,
* and constructs the projection mapping object.
* <p>
* Builds the per-scan projection plan given a set of projected columns.
* Determines the output schema, which columns to project from the data
* source, which are metadata, and so on.
* <p>
* An annoying aspect of SQL is that the projection list (the list of
* columns to appear in the output) is specified after the SELECT keyword.
* In Relational theory, projection is about columns, selection is about
* rows...
*
* <h4>Projection Mappings</h4>
*
* Mappings can be based on three primary use cases:
* <p><ul>
* <li><tt>SELECT *</tt>: Project all data source columns, whatever they happen
* to be. Create columns using names from the data source. The data source
* also determines the order of columns within the row.</li>
* <li><tt>SELECT columns</tt>: Similar to SELECT * in that it projects all columns
* from the data source, in data source order. But, rather than creating
* individual output columns for each data source column, creates a single
* column which is an array of Varchars which holds the (text form) of
* each column as an array element.</li>
* <li><tt>SELECT a, b, c, ...</tt>: Project a specific set of columns, identified by
* case-insensitive name. The output row uses the names from the SELECT list,
* but types from the data source. Columns appear in the row in the order
* specified by the SELECT.</li>
* <li<tt>SELECT ...</tt>: SELECT nothing, occurs in <tt>SELECT COUNT(*)</tt>
* type queries. The provided projection list contains no (table) columns, though
* it may contain metadata columns.</li>
* </ul>
* Names in the SELECT list can reference any of five distinct types of output
* columns:
* <p><ul>
* <li>Wildcard ("*") column: indicates the place in the projection list to insert
* the table columns once found in the table projection plan.</li>
* <li>Data source columns: columns from the underlying table. The table
* projection planner will determine if the column exists, or must be filled
* in with a null column.</li>
* <li>The generic data source columns array: <tt>columns</tt>, or optionally
* specific members of the <tt>columns</tt> array such as <tt>columns[1]</tt>.</li>
* <li>Implicit columns: <tt>fqn</tt>, <tt>filename</tt>, <tt>filepath</tt>
* and <tt>suffix</tt>. These reference
* parts of the name of the file being scanned.</li>
* <li>Partition columns: <tt>dir0</tt>, <tt>dir1</tt>, ...: These reference
* parts of the path name of the file.</li>
* </ul>
*
* <h4>Projection with a Schema</h4>
*
* The client can provide an <i>output schema</i> that defines the types (and
* defaults) for the tuple produced by the scan. When a schema is provided,
* the above use cases are extended as follows:
* <p><ul>
* <li><tt>SELECT *</tt> with strict schema: All columns in the output schema
* are projected, and only those columns. If a reader offers additional columns,
* those columns are ignored. If the reader omits output columns, the default value
* (if any) for the column is used.</li>
* <li><tt>SELECT *</tt> with a non-strict schema: the output tuple contains all
* columns from the output schema as explained above. In addition, if the reader
* provides any columns not in the output schema, those columns are appended to
* the end of the tuple. (That is, the output schema acts as it it were from
* an imaginary "0th" reader.)</li>
* <li>Explicit projection: only the requested columns appear, whether from the
* output schema, the reader, or as nulls.</li>
* </ul>
* <p>
* @see {@link org.apache.drill.exec.store.ColumnExplorer}, the class from which this class
* evolved
*/
public class ScanLevelProjection {
/**
* Identifies the kind of projection done for this scan.
*/
public enum ScanProjectionType {
/**
* No projection. Occurs for SELECT COUNT(*) ... queries.
*/
EMPTY,
/**
* Wildcard. Occurs for SELECT * ... queries when no output schema is
* available. The scan projects all columns from all readers, using the
* type from that reader. Schema "smoothing", if enabled, will attempt
* to preserve column order, type and mode from one reader to the next.
*/
WILDCARD,
/**
* Explicit projection. Occurs for SELECT a, b, c ... queries, whether or
* not an output schema is present. In this case, the projection list
* identifies the set of columns to project and their order. The output
* schema, if present, specifies data types and modes.
*/
EXPLICIT,
/**
* Wildcard query expanded using an output schema. Occurs for a
* SELECT * ... query with an output schema. The set of projected columns
* are those from the output schema, in the order specified by the schema,
* with names (and name case) specified by the schema. In this mode, the
* schema is partial: readers may include additional columns which are
* appended to those provided by the schema.
* <p>
* TODO: Provide a strict mode that forces the use of the types and modes
* from the output schema. In lenient mode, the framework will adjust
* mode to allow the query to succeed (changing a required mode to
* optional, say, if the column is not provided by the reader and has
* no default. Strict mode would fail the query in this case.)
* <p>
* TODO: Enable schema smoothing in this case: use that mechanism to
* smooth over the "extra" reader columns.
*/
SCHEMA_WILDCARD,
/**
* Wildcard query expanded using an output schema in "strict" mode.
* Only columns from the output schema will be projected. If a reader
* offers columns not in the output schema, they will be ignored. That
* is, a SELECT * query expands to exactly the columns in the schema.
* <p>
* TODO: Provide a strict column mode that will fail the query if a projected
* column is required, has no default, and is not provided by the reader. In
* the normal lenient mode, the scan framework will adjust the data mode to
* optional so that the query will run.
*/
STRICT_SCHEMA_WILDCARD;
public boolean isWildcard() {
return this == WILDCARD ||
this == SCHEMA_WILDCARD ||
this == STRICT_SCHEMA_WILDCARD;
}
}
/**
* Interface for add-on parsers, avoids the need to create
* a single, tightly-coupled parser for all types of columns.
* The main parser handles wildcards and assumes the rest of
* the columns are table columns. The add-on parser can tag
* columns as special, such as to hold metadata.
*/
public interface ScanProjectionParser {
void bind(ScanLevelProjection builder);
boolean parse(RequestedColumn inCol);
void validate();
void validateColumn(ColumnProjection col);
void build();
}
public static class Builder {
private List<SchemaPath> projectionList;
private final List<ScanProjectionParser> parsers = new ArrayList<>();
private TupleMetadata providedSchema;
/**
* Context used with error messages.
*/
protected CustomErrorContext errorContext;
/**
* Specify the set of columns in the SELECT list. Since the column list
* comes from the query planner, assumes that the planner has checked
* the list for syntax and uniqueness.
*
* @param projectionList list of columns in the SELECT list in SELECT list order
* @return this builder
*/
public Builder projection(List<SchemaPath> projectionList) {
this.projectionList = projectionList;
return this;
}
public Builder parsers(List<ScanProjectionParser> parsers) {
this.parsers.addAll(parsers);
return this;
}
public Builder providedSchema(TupleMetadata providedSchema) {
this.providedSchema = providedSchema;
return this;
}
public Builder errorContext(CustomErrorContext context) {
this.errorContext = context;
return this;
}
public ScanLevelProjection build() {
return new ScanLevelProjection(this);
}
public TupleMetadata providedSchema( ) {
return providedSchema == null || providedSchema.size() == 0
? null : providedSchema;
}
public List<SchemaPath> projectionList() {
if (projectionList == null) {
projectionList = new ArrayList<>();
projectionList.add(SchemaPath.STAR_COLUMN);
}
return projectionList;
}
}
// Input
/**
* Context used with error messages.
*/
protected final CustomErrorContext errorContext;
protected final List<SchemaPath> projectionList;
protected final TupleMetadata readerSchema;
// Configuration
protected List<ScanProjectionParser> parsers;
// Internal state
protected boolean includesWildcard;
protected boolean sawWildcard;
// Output
protected List<ColumnProjection> outputCols = new ArrayList<>();
/**
* Projection definition for the scan a whole. Parsed form of the input
* projection list.
*/
protected RequestedTuple outputProjection;
/**
* Projection definition passed to each reader. This is the set of
* columns that the reader is asked to provide.
*/
protected ProjectionFilter readerProjection;
protected ScanProjectionType projectionType;
private ScanLevelProjection(Builder builder) {
this.projectionList = builder.projectionList();
this.parsers = builder.parsers;
this.readerSchema = builder.providedSchema();
this.errorContext = builder.errorContext;
doParse();
}
public static Builder builder() {
return new Builder();
}
/**
* Builder shortcut, primarily for tests.
*/
@VisibleForTesting
public static ScanLevelProjection build(List<SchemaPath> projectionList,
List<ScanProjectionParser> parsers) {
return new Builder()
.projection(projectionList)
.parsers(parsers)
.build();
}
/**
* Builder shortcut, primarily for tests.
*/
@VisibleForTesting
public static ScanLevelProjection build(List<SchemaPath> projectionList,
List<ScanProjectionParser> parsers,
TupleMetadata outputSchema) {
return new Builder()
.projection(projectionList)
.parsers(parsers)
.providedSchema(outputSchema)
.build();
}
private void doParse() {
outputProjection = Projections.parse(projectionList);
switch (outputProjection.type()) {
case ALL:
includesWildcard = true;
projectionType = ScanProjectionType.WILDCARD;
break;
case NONE:
projectionType = ScanProjectionType.EMPTY;
break;
default:
projectionType = ScanProjectionType.EXPLICIT;
break;
}
for (ScanProjectionParser parser : parsers) {
parser.bind(this);
}
// Process projected columns.
for (RequestedColumn inCol : outputProjection.projections()) {
if (inCol.isWildcard()) {
mapWildcard(inCol);
} else {
mapColumn(inCol);
}
}
verify();
for (ScanProjectionParser parser : parsers) {
parser.build();
}
buildReaderProjection();
}
private void buildReaderProjection() {
// Create the reader projection which includes either all columns
// (saw a wildcard) or just the unresolved columns (which excludes
// implicit columns.)
//
// Note that only the wildcard without schema can omit the output
// projection. With a schema, we want the schema columns (which may
// or may not correspond to reader columns.)
RequestedTuple rootProjection;
if (projectionType == ScanProjectionType.EMPTY) {
rootProjection = ImpliedTupleRequest.NO_MEMBERS;
} else if (projectionType != ScanProjectionType.EXPLICIT) {
rootProjection = ImpliedTupleRequest.ALL_MEMBERS;
} else {
List<RequestedColumn> outputProj = new ArrayList<>();
for (ColumnProjection col : outputCols) {
if (col instanceof AbstractUnresolvedColumn) {
outputProj.add(((AbstractUnresolvedColumn) col).element());
}
}
rootProjection = Projections.build(outputProj);
}
readerProjection = ProjectionFilter.providedSchemaFilter(rootProjection, readerSchema, errorContext);
}
/**
* Wildcard is special: add it, then let parsers add any custom
* columns that are needed. The order is important: we want custom
* columns to follow table columns.
*/
private void mapWildcard(RequestedColumn inCol) {
// Wildcard column: this is a SELECT * query.
assert includesWildcard;
if (sawWildcard) {
throw new IllegalArgumentException("Duplicate * entry in project list");
}
// Expand strict schema columns, if provided
assert projectionType == ScanProjectionType.WILDCARD;
boolean expanded = expandOutputSchema();
// Remember the wildcard position, if we need to insert it.
// Ensures that the main wildcard expansion occurs before add-on
// columns.
int wildcardPosn = outputCols.size();
// Parsers can consume the wildcard. But, all parsers must
// have visibility to the wildcard column.
for (ScanProjectionParser parser : parsers) {
if (parser.parse(inCol)) {
wildcardPosn = -1;
}
}
// Set this flag only after the parser checks.
sawWildcard = true;
// If not consumed, put the wildcard column into the projection list as a
// placeholder to be filled in later with actual table columns.
if (expanded) {
projectionType = readerSchema.booleanProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP)
? ScanProjectionType.STRICT_SCHEMA_WILDCARD
: ScanProjectionType.SCHEMA_WILDCARD;
} else if (wildcardPosn != -1) {
outputCols.add(wildcardPosn, new UnresolvedWildcardColumn(inCol));
}
}
private boolean expandOutputSchema() {
if (readerSchema == null) {
return false;
}
// Expand the wildcard. From the perspective of the reader, this is an explicit
// projection, so enumerate the columns as though they were in the project list.
// Take the projection type from the output column's data type. That is,
// INT[] is projected as ARRAY, etc.
for (int i = 0; i < readerSchema.size(); i++) {
ColumnMetadata col = readerSchema.metadata(i);
// Skip columns tagged as "special"; those that should not expand
// automatically.
if (col.booleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD)) {
continue;
}
outputCols.add(new UnresolvedColumn(null, col));
}
return true;
}
/**
* Map the column into one of five categories.
* <ol>
* <li>Star column (to designate SELECT *)</li>
* <li>Partition file column (dir0, dir1, etc.)</li>
* <li>Implicit column (fqn, filepath, filename, suffix)</li>
* <li>Special <tt>columns</tt> column which holds all columns as
* an array.</li>
* <li>Table column. The actual match against the table schema
* is done later.</li>
* </ol>
*
* Actual mapping is done by parser extensions for all but the
* basic cases.
*
* @param inCol the SELECT column
*/
private void mapColumn(RequestedColumn inCol) {
// Give the extensions first crack at each column.
// Some may want to "sniff" a column, even if they
// don't fully handle it.
for (ScanProjectionParser parser : parsers) {
if (parser.parse(inCol)) {
return;
}
}
// If the project list has a wildcard, and the column is not one recognized
// by the specialized parsers above, then just ignore it. It is likely a duplicate
// column name. In any event, it will be processed by the Project operator on
// top of this scan.
if (includesWildcard) {
return;
}
// This is a desired table column.
addTableColumn(inCol);
}
private void addTableColumn(RequestedColumn inCol) {
ColumnMetadata outputCol = null;
if (readerSchema != null) {
outputCol = readerSchema.metadata(inCol.name());
}
addTableColumn(new UnresolvedColumn(inCol, outputCol));
}
public void addTableColumn(ColumnProjection outCol) {
outputCols.add(outCol);
}
public void addMetadataColumn(ColumnProjection outCol) {
outputCols.add(outCol);
}
/**
* Once all columns are identified, perform a final pass
* over the set of columns to do overall validation. Each
* add-on parser is given an opportunity to do its own
* validation.
*/
private void verify() {
// Let parsers do overall validation.
for (ScanProjectionParser parser : parsers) {
parser.validate();
}
// Validate column-by-column.
for (ColumnProjection outCol : outputCols) {
for (ScanProjectionParser parser : parsers) {
parser.validateColumn(outCol);
}
}
}
public CustomErrorContext context() { return errorContext; }
/**
* Return the set of columns from the SELECT list
* @return the SELECT list columns, in SELECT list order
*/
public List<SchemaPath> requestedCols() { return projectionList; }
/**
* The entire set of output columns, in output order. Output order is
* that specified in the SELECT (for an explicit list of columns) or
* table order (for SELECT * queries).
* @return the set of output columns in output order
*/
public List<ColumnProjection> columns() { return outputCols; }
public ScanProjectionType projectionType() { return projectionType; }
/**
* Return whether this is a SELECT * query
* @return true if this is a SELECT * query
*/
public boolean projectAll() { return projectionType.isWildcard(); }
/**
* Returns true if the projection list is empty. This usually
* indicates a <tt>SELECT COUNT(*)</tt> query (though the scan
* operator does not have the context to know that an empty
* list does, in fact, imply a count-only query...)
*
* @return true if no table columns are projected, false
* if at least one column is projected (or the query contained
* the wildcard)
*/
public boolean isEmptyProjection() { return projectionType == ScanProjectionType.EMPTY; }
public RequestedTuple rootProjection() { return outputProjection; }
public ProjectionFilter readerProjection() { return readerProjection; }
public boolean hasReaderSchema() { return readerSchema != null; }
public TupleMetadata readerSchema() { return readerSchema; }
@Override
public String toString() {
return new StringBuilder()
.append("[")
.append(getClass().getSimpleName())
.append(" projection=")
.append(outputCols.toString())
.append("]")
.toString();
}
}