blob: 34ceb9c99de5a93caa873a488197a38aa8c2576f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3.schema;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.v3.schema.DynamicSchemaFilter.DynamicTupleFilter;
import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema.ColumnHandle;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter.ProjResult;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.DynamicColumn;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Resolves a schema against the existing scan schema.
* Expands columns by comparing the existing scan schema with
* a "revised" (provided or reader) schema, adjusting the scan schema
* accordingly. Maps are expanded recursively. Other columns
* must match types (concrete columns) or the type must match projection
* (for dynamic columns.)
* <ul>
* <li>Resolves a provided schema against the projection list.
* The provided schema can be strict (converts a wildcard into
* an explicit projection) or lenient (the reader can add
* additional columns to a wildcard.)</li>
* <li>Resolves an early reader schema against the projection list
* and optional provided schema.</li>
* <li>Resolves a reader output schema against a dynamic (projection
* list), concreted (provided or prior reader) schema) or combination.
* </ul>
* <p>
* In practice, the logic is simpler: given a schema (dynamic, concrete
* or combination), further resolve the schema using the input schema
* provided. Resolve dynamic columns, verify consistency of concrete
* columns.
* <p>
* Projected columns start as <i>dynamic</i> (no type). Columns
* are resolved to a known type as a schema identifies that type.
* Subsequent schemas are obligated to use that same type to avoid
* an inconsistent schema change downstream.
* <p>
* Expands columns by comparing the existing scan schema with
* a "revised" (provided or reader) schema, adjusting the scan schema
* accordingly. Maps are expanded recursively. Other columns
* must match types (concrete columns) or the type must match projection
* (for dynamic columns.)
* <p>
* A "resolved" projection list is a list of concrete columns: table
* columns, nulls, file metadata or partition metadata. An unresolved list
* has either table column names, but no match, or a wildcard column.
* <p>
* The idea is that the projection list moves through stages of resolution
* depending on which information is available. An "early schema" table
* provides schema information up front, and so allows fully resolving
* the projection list on table open. A "late schema" table allows only a
* partially resolved projection list, with the remainder of resolution
* happening on the first (or perhaps every) batch.
*/
public class ScanSchemaResolver {
private static final Logger logger = LoggerFactory.getLogger(ScanSchemaResolver.class);
/**
* Indicates the source of the schema to be analyzed.
* Each schema type has subtly different rules. The
* schema type allows us to inject those differences inline
* within the resolution process. Also, each schema carries
* a tag used for error reporting.
*/
public enum SchemaType {
STRICT_PROVIDED_SCHEMA("Provided"),
LENIENT_PROVIDED_SCHEMA("Provided"),
EARLY_READER_SCHEMA("Reader"),
FIRST_READER_SCHEMA("Reader"),
READER_SCHEMA("Reader"),
MISSING_COLS("Missing columns");
private final String source;
SchemaType(String source) {
this.source = source;
}
public String source() {
return source;
}
}
private final MutableTupleSchema schema;
private final SchemaType mode;
private final boolean allowMapAdditions;
private final String source;
private final CustomErrorContext errorContext;
private final boolean allowColumnReorder;
public ScanSchemaResolver(MutableTupleSchema schema, SchemaType mode,
boolean allowMapAdditions,
CustomErrorContext errorContext) {
this.schema = schema;
this.mode = mode;
this.errorContext = errorContext;
this.allowMapAdditions = allowMapAdditions;
this.source = mode.source();
switch (mode) {
case STRICT_PROVIDED_SCHEMA:
case LENIENT_PROVIDED_SCHEMA:
case EARLY_READER_SCHEMA:
case FIRST_READER_SCHEMA:
// Allow reordering columns with projection is of the form
// *, foo. Move bar to its place in the schema (foo, bar) rather
// than at the end, as with implicit columns.
this.allowColumnReorder = schema.projectionType() == ProjectionType.ALL;
break;
default:
this.allowColumnReorder = false;
}
}
public void applySchema(TupleMetadata sourceSchema) {
switch (schema.projectionType()) {
case ALL:
projectSchema(sourceSchema);
if (mode == SchemaType.STRICT_PROVIDED_SCHEMA) {
schema.setProjectionType(ScanSchemaTracker.ProjectionType.SOME);
}
break;
case SOME:
projectSchema(sourceSchema);
break;
default:
// Do nothing
}
}
/**
* A project list can contain implicit columns in addition to the wildcard.
* The wildcard defines the <i>insert point</i>: the point at which
* reader-defined columns are inserted as found. This version applies a
* provided schema to a projection. If we are given a query of the form
* {@code SELECT * FROM foo ORDER BY bar}, Drill will give us a projection
* list of the form {@code [`**`, `bar`]} and normal projection processing
* will project all provided columns, except {@code bar}, in place of the
* wildcard. Since this behavior differs from all other DBs, we apply special
* processing, we move the projection column into the next wildcard position
* as if Drill did not include the extra column projection. This is a hack,
* but one that helps with ease-of-use. We apply the same rule to the first
* reader schema for the same reason.
*/
private void projectSchema(TupleMetadata sourceSchema) {
for (ColumnMetadata colSchema : sourceSchema) {
ColumnHandle existing = schema.find(colSchema.name());
if (existing == null) {
insertColumn(colSchema);
} else {
mergeColumn(existing, colSchema);
if (allowColumnReorder) {
schema.moveIfExplicit(colSchema.name());
}
}
}
}
/**
* Insert a new column into the schema at the wildcard insertion point.
* Columns can generally only be inserted for a wildcard, and only when
* schema change is allowed. A number of special cases occur for the
* various kinds of schemas.
*/
private void insertColumn(ColumnMetadata col) {
switch (mode) {
case FIRST_READER_SCHEMA:
case READER_SCHEMA:
if (schema.projectionType() != ProjectionType.ALL) {
throw new IllegalStateException(
"Reader should not have projected an unprojected column: " + col.name());
}
break;
case EARLY_READER_SCHEMA:
case LENIENT_PROVIDED_SCHEMA:
case STRICT_PROVIDED_SCHEMA:
if (schema.projectionType() != ProjectionType.ALL || SchemaUtils.isExcludedFromWildcard(col)) {
return;
}
break;
case MISSING_COLS:
throw new IllegalStateException("Missing columns should not add new columns");
default:
throw new IllegalStateException(mode.name());
}
ColumnMetadata copy = col.copy();
schema.insert(copy);
// This is a top-level column inserted from a wildcard. If the column
// is a map, then project all its members. But, don't do that for
// a strict schema. ("Strict" means only allow members in the provided
// schema.)
if (copy.isMap() && mode != SchemaType.STRICT_PROVIDED_SCHEMA) {
SchemaUtils.markProjectAll(copy);
}
}
/**
* Merge an incoming column with an existing column which can either be
* dynamic or concrete. Special cases occur for implicit columns which are
* independent of reader schema, but which reside in the same namespace,
* causing potential conflicts.
*/
private void mergeColumn(ColumnHandle col, ColumnMetadata colSchema) {
switch (mode) {
case LENIENT_PROVIDED_SCHEMA:
case STRICT_PROVIDED_SCHEMA:
// Even with a wildcard, the planner may add additional columns.
// Example SELECT * FROM foo ORDER BY bar
// The planner will provide us with [`*`, `bar`]
break;
case EARLY_READER_SCHEMA:
// If the reader offers a column which duplicates an implicit column,
// act as if the column is not projected, but give a warning since
// the user might expect the column to be projected in a wildcard.
if (col.isImplicit()) {
logger.warn("Column {} shadows an implicit column of the same name: ignored",
colSchema.name());
return;
}
break;
default:
// The reader should not project a column with the same name as an
// implicit column. The projection filter should have prevented it.
// If projection does occur, we cannot accept the column and have
// no way to dispose of the unwanted vector.
if (col.isImplicit()) {
throw UserException.validationError()
.message("Reader column conflicts an implicit column, should not have been projected")
.addContext("Column", colSchema.name())
.addContext(errorContext)
.build(logger);
}
}
if (col.column().isDynamic()) {
mergeColumnWithDynamic(col, colSchema);
} else {
mergeWithConcrete(col.column(), colSchema);
}
}
/**
* Merge a resolved column with a dynamic column (from the project list or a dynamic
* defined schema). Verify consistency with the projection. Should have already
* been done by the projection filter for reader output columns, done here for all
* other schema types.
*/
private void mergeColumnWithDynamic(ColumnHandle existing, ColumnMetadata revised) {
DynamicColumn existingCol = (DynamicColumn) existing.column();
if (existingCol instanceof ProjectedColumn) {
SchemaUtils.verifyCompatibility((ProjectedColumn) existingCol, revised,
source, errorContext);
}
if (existingCol.isMap() || revised.isMap()) {
schema.replace(existing, createMap(existingCol, revised));
} else {
schema.resolve(existing, revised.copy());
}
}
/**
* Merge an incoming column with an existing resolved column. Non-map columns
* must match. Maps are merged recursively.
*/
private void mergeWithConcrete(ColumnMetadata existing,
ColumnMetadata revised) {
SchemaUtils.verifyConsistency(existing, revised, source, errorContext);
if (existing.isMap()) {
ProjectionFilter filter = new DynamicTupleFilter(existing.tupleSchema(),
allowMapAdditions, errorContext, source);
expandMapProjection(existing.tupleSchema(), filter, revised.tupleSchema());
}
}
/**
* Create a map column. The map might have an explicit projection
* ({@code m.a, m.b}). To ensure consistency with reader behavior, use the same
* projection filter as the reader to determine which provided or early reader schema
* columns to project.
*/
private ColumnMetadata createMap(DynamicColumn projection,
ColumnMetadata revised) {
return createMap(projection,
DynamicTupleFilter.filterFor(projection, allowMapAdditions, errorContext, source),
revised);
}
/**
* Recursively create a map, including nested maps.
*/
private ColumnMetadata createMap(DynamicColumn projection, ProjectionFilter filter,
ColumnMetadata revised) {
ColumnMetadata map = revised.cloneEmpty();
SchemaUtils.mergeColProperties(map, projection);
SchemaUtils.mergeColProperties(map, revised);
copyDynamicMembers(map, projection);
// When resolving a generic column to a map, the map is treated
// as "map.*". That is, we can add additional columns later
// (assuming the scan allows it.) However, if this is a strict schema,
// then strict means no additional columns are allowed.
if (!projection.isMap() && mode != SchemaType.STRICT_PROVIDED_SCHEMA) {
SchemaUtils.markProjectAll(map);
}
expandMapProjection(map.tupleSchema(), filter, revised.tupleSchema());
return map;
}
private void copyDynamicMembers(ColumnMetadata map, DynamicColumn projection) {
if (projection.isMap()) {
TupleMetadata mapSchema = map.tupleSchema();
for (ColumnMetadata col : projection.tupleSchema()) {
mapSchema.addColumn(col.copy());
}
}
}
/**
* Given an existing map, a projection filter, and an actual
* reader output, update the existing map with the reader schema,
* gated on the projection schema. Note that the projection schema
* may not be needed it the reader schema followed the projection
* filter which was based on the projection map.
*/
private void expandMapProjection(TupleMetadata scanSchema,
ProjectionFilter filter,
TupleMetadata revisedSchema) {
for (ColumnMetadata readerCol : revisedSchema) {
resolveMember(scanSchema, filter.projection(readerCol), readerCol);
}
}
private void resolveMember(TupleMetadata scanSchema, ProjResult result,
ColumnMetadata readerCol) {
ColumnMetadata schemaCol = result.projection;
if (!result.isProjected) {
switch (mode) {
case EARLY_READER_SCHEMA:
case LENIENT_PROVIDED_SCHEMA:
case STRICT_PROVIDED_SCHEMA:
break;
case FIRST_READER_SCHEMA:
case READER_SCHEMA:
if (!allowMapAdditions) {
throw new IllegalStateException("Reader should not have projected column: " + readerCol.name());
}
break;
default:
throw new IllegalStateException(mode.name());
}
} else if (schemaCol == null) {
ColumnMetadata copy = readerCol.copy();
if (readerCol.isMap()) {
SchemaUtils.markProjectAll(copy);
}
scanSchema.addColumn(copy);
} else if (schemaCol.isDynamic()) {
if (schemaCol.isMap()) {
scanSchema.replace(createMap((ProjectedColumn) schemaCol,
result.mapFilter, readerCol));
} else {
scanSchema.replace(readerCol.copy());
}
} else if (schemaCol.isMap()) {
expandMapProjection(schemaCol.tupleSchema(),
result.mapFilter, readerCol.tupleSchema());
} // else cols are identical simple
}
}