blob: a11b67590530d50be2d11e476e188a5e43de412c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3.schema;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
import org.apache.drill.exec.physical.impl.scan.v3.schema.DynamicSchemaFilter.RowSchemaFilter;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaResolver.SchemaType;
import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
/**
* Schema tracker for the "normal" case in which schema starts from a simple
* projection list of column names, optionally with a provided schema. The
* schema evolves by locating implicit columns, then having he reader define
* column types, and so on.
*/
public class ProjectionSchemaTracker extends AbstractSchemaTracker {
private final TupleMetadata projection;
private final boolean allowSchemaChange;
private int implicitInsertPoint;
private int readerSchemaCount;
private boolean allowMapAdditions = true;
public ProjectionSchemaTracker(TupleMetadata definedSchema,
ProjectionParseResult parseResult,
CustomErrorContext errorContext) {
super(errorContext);
this.projection = parseResult.dynamicSchema;
this.allowSchemaChange = false;
schema.copyFrom(definedSchema);
validateProjection(parseResult.dynamicSchema, definedSchema);
ProjectionType projType;
if (schema.size() == 0) {
projType = ProjectionType.NONE;
} else {
projType = ProjectionType.SOME;
}
schema.setProjectionType(projType);
this.implicitInsertPoint = -1;
checkResolved();
}
public ProjectionSchemaTracker(ProjectionParseResult parseResult, boolean allowSchemaChange,
CustomErrorContext errorContext) {
super(errorContext);
this.projection = parseResult.dynamicSchema;
this.allowSchemaChange = allowSchemaChange;
this.schema.copyFrom(projection);
// Work out the projection type: wildcard, empty, or explicit.
ProjectionType projType;
if (parseResult.isProjectAll()) {
projType = ProjectionType.ALL;
} else if (projection.isEmpty()) {
projType = ProjectionType.NONE;
this.isResolved = true;
this.allowMapAdditions = false;
} else {
projType = ProjectionType.SOME;
}
this.schema.setProjectionType(projType);
// If wildcard, record the wildcard position.
this.schema.setInsertPoint(parseResult.wildcardPosn);
this.implicitInsertPoint = parseResult.wildcardPosn;
}
@Override
public ProjectedColumn columnProjection(String colName) {
return (ProjectedColumn) projection.metadata(colName);
}
public void applyProvidedSchema(TupleMetadata providedSchema) {
boolean isStrict = SchemaUtils.isStrict(providedSchema);
new ScanSchemaResolver(schema,
isStrict ? SchemaType.STRICT_PROVIDED_SCHEMA : SchemaType.LENIENT_PROVIDED_SCHEMA,
true, errorContext)
.applySchema(providedSchema);
checkResolved();
if (isStrict) {
allowMapAdditions = false;
}
}
@Override
public void applyEarlyReaderSchema(TupleMetadata readerSchema) {
new ScanSchemaResolver(schema, SchemaType.EARLY_READER_SCHEMA, true, errorContext)
.applySchema(readerSchema);
checkResolved();
}
/**
* Set up a projection filter using the reader input schema returned
* from {@link #readerInputSchema()}.
* <ul>
* <li>If this is an empty projection (@{code SELECT COUNT(*)}), then
* noting can be projected at all.</li>
* <li>If this is an explicit projection (@code SELECT a, b)}, then
* the set of top-level columns is fixed, though the types are unknown.
* Maps allow new members depending on the map projection: a generic
* projection ({@code m}) allows new members, a specific projection
* ({@code m.a, m.b}) does not allow new members.</li>
* <li>If the schema has been resolved and is now fixed (closed), then
* no new columns are allowed either at the top level or in maps.</li>
* <li>If the schema is open ({@code SELECT *} for the first reader,
* or schema change is allowed in the second reader), and we have
* no columns, then just project everything.</li>
* <li>If the schema is open, but we have seen some columns, then
* columns can still be added, but existing columns must match the
* existing schema.</li>
* </ul>
* <p>
* Static filters handle the simple "none" and "starting from nothing
* all" cases. The dynamic schema filter handles the case of existing
* columns whether dynamic or static.
*/
@Override
public ProjectionFilter projectionFilter(CustomErrorContext errorContext) {
switch (projectionType()) {
case ALL:
// Empty schema implies we've only seen the wildcard this far.
if (schema.size() == 0) {
return ProjectionFilter.PROJECT_ALL;
}
break;
case NONE:
return ProjectionFilter.PROJECT_NONE;
default:
}
return new RowSchemaFilter(schema, allowMapAdditions, errorContext);
}
@Override
public void applyReaderSchema(TupleMetadata readerOutputSchema,
CustomErrorContext errorContext) {
SchemaType schemaType;
// The first reader can reposition columns projected with a wildcard,
// other readers cannot as we want to preserve column order after the
// first batch.
if (readerSchemaCount == 0 && allowSchemaChange) {
schemaType = SchemaType.FIRST_READER_SCHEMA;
} else {
schemaType = SchemaType.READER_SCHEMA;
}
new ScanSchemaResolver(schema, schemaType, allowMapAdditions, errorContext)
.applySchema(readerOutputSchema);
if (!allowSchemaChange) {
allowMapAdditions = false;
if (projectionType() == ProjectionType.ALL) {
schema.setProjectionType(ProjectionType.SOME);
}
}
checkResolved();
readerSchemaCount++;
}
@Override
public void expandImplicitCol(ColumnMetadata resolved, ImplicitColumnMarker marker) {
schema.insert(implicitInsertPoint++, resolved).markImplicit(marker);
}
}