blob: 50a7155716e90044fb1cbf7c4bddff461c2e5215 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3.file;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.FileImplicitMarker;
import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.InternalColumnMarker;
import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.PartitionColumnMarker;
import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema;
import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema.ColumnHandle;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumn;
import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
import org.apache.drill.exec.store.ColumnExplorer.ImplicitInternalFileColumns;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages the resolution of implicit file metadata and partition columns.
* Parses the file metadata columns from the projection list. Creates a parse
* result which drives loading data into vectors. Supports renaming the columns
* via session options.
* <p>
* Lifecycle:
* <ul>
* <li>At the start of the scan, the parser looks for implicit and partition
* columns in the scan schema, resolving matching columns to be implicit
* columns, and building up a description of those columns for use later.</li>
* <li>If the projection list contains a wildcard, it can also contain implicit
* columns. If running in legacy mode, inserts partition columns when the query
* contains a wildcard.</li>
* <li>On each file (on each reader), the parse result allows generating actual
* values for each column, which are then written into the corresponding value
* vectors.</li>
* </ul>
* <p>
* Assumes that the partition count is fixed at runtime; that it is determined
* at plan time and provided in the plan. This ensures that the schema is stable
* across readers: even a reader at the top-most partition will produce columns
* for all partitions if using legacy mode wildcard expansion.
*/
public class ImplicitColumnResolver {
private static final Logger logger = LoggerFactory.getLogger(ImplicitColumnResolver.class);
public static class ImplicitColumnOptions {
protected OptionSet optionSet;
protected int maxPartitionDepth;
/**
* Historically Drill will expand partition columns (dir0, dir1, ...)
* when the project list includes a wildcard.
*/
protected boolean useLegacyWildcardExpansion = true;
protected DrillFileSystem dfs;
public ImplicitColumnOptions optionSet(OptionSet optionSet) {
this.optionSet = optionSet;
return this;
}
/**
* The maximum partition depth for any file in this query. Specifies
* the maximum number of {@code diri} columns that this parser will
* recognize or generate.
*/
public ImplicitColumnOptions maxPartitionDepth(int maxPartitionDepth) {
this.maxPartitionDepth = maxPartitionDepth;
return this;
}
/**
* Indicates whether to expand partition columns when the query contains a wildcard.
* Supports queries such as the following:<code><pre>
* select * from dfs.`partitioned-dir`</pre></code>
* In which the output columns will be (columns, dir0) if the partitioned directory
* has one level of nesting.
*
* See {@link TestImplicitFileColumns#testImplicitColumns}
*/
public ImplicitColumnOptions useLegacyWildcardExpansion(boolean flag) {
useLegacyWildcardExpansion = flag;
return this;
}
public ImplicitColumnOptions dfs(DrillFileSystem dfs) {
this.dfs = dfs;
return this;
}
}
/**
* The result of scanning the scan output schema to identify implicit and
* partition columns. Defines a sub-schema of just these columns, along with
* column markers which resolve the columns for each file.
*/
public static class ParseResult {
private final List<ImplicitColumnMarker> columns;
private final TupleMetadata schema;
private final boolean isMetadataScan;
protected ParseResult(List<ImplicitColumnMarker> columns, TupleMetadata schema,
boolean isMetadataScan) {
ImplicitColumnMarker reordered[] = new ImplicitColumnMarker[columns.size()];
for (ImplicitColumnMarker col : columns) {
reordered[col.index()] = col;
}
this.columns = Arrays.asList(reordered);
this.schema = schema;
this.isMetadataScan = isMetadataScan;
}
public TupleMetadata schema() { return schema; }
public List<ImplicitColumnMarker> columns() { return columns; }
public boolean isMetadataScan() { return isMetadataScan; }
public Object[] resolve(FileDescrip fileInfo) {
Object values[] = new Object[columns.size()];
for (int i = 0; i < values.length; i++) {
values[i] = columns.get(i).resolve(fileInfo);
}
return values;
}
}
private static class ImplicitColumnParser {
private final ImplicitColumnResolver parser;
private final ScanSchemaTracker tracker;
private final MutableTupleSchema scanSchema;
private final List<ImplicitColumnMarker> columns = new ArrayList<>();
private final Set<Integer> referencedPartitions = new HashSet<>();
private boolean isMetadataScan;
protected ImplicitColumnParser(ImplicitColumnResolver parser, ScanSchemaTracker tracker) {
this.parser = parser;
this.tracker = tracker;
this.scanSchema = tracker.internalSchema();
}
protected ParseResult parse() {
for (ColumnHandle col : tracker.internalSchema().columns()) {
matchColumn(col);
}
if (tracker.internalSchema().projectionType() == ScanSchemaTracker.ProjectionType.ALL) {
expandWildcard();
}
// Have the tracker gather the implicit columns so they appear
// in the same order as the output schema, even if a wildcard
// appears out-of-order:
// SELECT *, fileName
// SELECT fileName, *
return new ParseResult(columns, tracker.applyImplicitCols(), isMetadataScan);
}
private void expandWildcard() {
if (!parser.useLegacyWildcardExpansion) {
return;
}
// Legacy wildcard expansion: include the file partitions for this file.
// This is a disadvantage for a * query: files at different directory
// levels will have different numbers of columns. Would be better to
// return this data as an array at some point.
// Append this after the *, keeping the * for later expansion.
for (int i = 0; i < parser.maxPartitionDepth; i++) {
if (referencedPartitions.contains(i)) {
continue;
}
ImplicitColumnMarker marker = new PartitionColumnMarker(i);
ColumnMetadata resolved = MetadataUtils.newScalar(parser.partitionName(i), PARTITION_COL_TYPE);
columns.add(marker);
tracker.expandImplicitCol(resolved, marker);
referencedPartitions.add(i);
}
}
private void matchColumn(ColumnHandle col) {
String colType = SchemaUtils.implicitColType(col.column());
if (colType != null) {
resolveTaggedColumn(parser, col, colType);
return;
} else if (col.column().isDynamic()) {
matchByName(col);
}
}
private void resolveTaggedColumn(ImplicitColumnResolver parser,
ColumnHandle col, String colType) {
Matcher m = parser.partitionTypePattern.matcher(colType);
if (m.matches()) {
resolvePartitionColumn(m, parser, col);
return;
}
ImplicitFileColumn defn = parser.typeDefs.get(colType);
if (defn != null) {
resolveImplicitColumn((ImplicitFileColumns) defn, col, colType);
return;
}
resolveUnknownColumn(col, colType);
}
private void resolvePartitionColumn(Matcher m, ImplicitColumnResolver parser,
ColumnHandle col) {
// The provided schema column must be of the correct type and mode.
ColumnMetadata colSchema = col.column();
if (colSchema.type() != MinorType.VARCHAR ||
colSchema.mode() != DataMode.OPTIONAL) {
throw UserException.validationError()
.message("Provided column `%s` is marked as a parition column, but is of the wrong type",
colSchema.columnString())
.addContext("Expected type", MinorType.VARCHAR.name())
.addContext("Expected cardinality", DataMode.OPTIONAL.name())
.addContext(parser.errorContext)
.build(logger);
}
// Partition column
int partitionIndex = Integer.parseInt(m.group(1));
markImplicit(col, new PartitionColumnMarker(partitionIndex));
// Remember the partition for later wildcard expansion
referencedPartitions.add(partitionIndex);
}
private void resolveImplicitColumn(ImplicitFileColumns defn,
ColumnHandle col, String colType) {
// The provided schema column must be of the correct type and mode.
ColumnMetadata colSchema = col.column();
if (colSchema.type() != MinorType.VARCHAR ||
colSchema.mode() == DataMode.REPEATED) {
throw UserException.validationError()
.message("Provided column `%s` is marked as implicit '%s', but is of the wrong type",
colSchema.columnString(), defn.propertyValue())
.addContext("Expected type", MinorType.VARCHAR.name())
.addContext("Expected cardinality", String.format("%s or %s",
DataMode.REQUIRED.name(), DataMode.OPTIONAL.name()))
.addContext(parser.errorContext)
.build(logger);
}
markImplicit(col, new FileImplicitMarker(defn));
}
private void markImplicit(ColumnHandle col, ImplicitColumnMarker marker) {
columns.add(marker);
col.markImplicit(marker);
}
private void resolveUnknownColumn(ColumnHandle col, String colType) {
throw UserException.validationError()
.message("Provided column %s references an undefined implicit column type '%s'",
col.column().columnString(), colType)
.addContext("Expected type", MinorType.VARCHAR.name())
.addContext("Expected cardinality", String.format("%s or %s",
DataMode.REQUIRED.name(), DataMode.OPTIONAL.name()))
.addContext(parser.errorContext)
.build(logger);
}
private void matchByName(ColumnHandle col) {
Matcher m = parser.partitionPattern.matcher(col.name());
if (m.matches()) {
buildPartitionColumn(m, col);
return;
}
ImplicitFileColumn defn = parser.colDefs.get(col.name());
if (defn != null) {
buildImplicitColumn(defn, col);
}
}
private void buildPartitionColumn(Matcher m, ColumnHandle col) {
// If the projected column is a map or array, then it shadows the
// partition column. Example: dir0.x, dir0[2].
ProjectedColumn projCol = (ProjectedColumn) col.column();
if (!projCol.isSimple()) {
logger.warn("Projected column {} shadows partition column {}",
projCol.projectString(), col.name());
return;
}
// Partition column
int partitionIndex = Integer.parseInt(m.group(1));
resolve(col,
MetadataUtils.newScalar(col.name(), PARTITION_COL_TYPE),
new PartitionColumnMarker(partitionIndex));
// Remember the partition for later wildcard expansion
referencedPartitions.add(partitionIndex);
}
private void resolve(ColumnHandle col, ColumnMetadata resolved, ImplicitColumnMarker marker) {
columns.add(marker);
scanSchema.resolveImplicit(col, resolved, marker);
}
private void buildImplicitColumn(ImplicitFileColumn defn,
ColumnHandle col) {
// If the projected column is a map or array, then it shadows the
// metadata column. Example: filename.x, filename[2].
ProjectedColumn projCol = (ProjectedColumn) col.column();
if (!projCol.isSimple()) {
logger.warn("Projected column {} shadows implicit column {}",
projCol.projectString(), col.name());
} else if (defn instanceof ImplicitInternalFileColumns) {
resolveInternalColumn(col, (ImplicitInternalFileColumns) defn);
} else {
resolve(col,
MetadataUtils.newScalar(col.name(), IMPLICIT_COL_TYPE),
new FileImplicitMarker((ImplicitFileColumns) defn));
}
}
private void resolveInternalColumn(ColumnHandle col,
ImplicitInternalFileColumns defn) {
// Tests may not provide the DFS, real code must
if (defn == ImplicitInternalFileColumns.LAST_MODIFIED_TIME &&
parser.dfs == null) {
throw new IllegalStateException(
"Must provide a file system to use " + defn.name());
}
// Check if this is an implied metadata scan
if (defn == ImplicitInternalFileColumns.PROJECT_METADATA) {
isMetadataScan = true;
}
// TODO: Internal columns are VARCHAR for historical reasons.
// Better to use a type that fits the column purposes.
resolve(col,
MetadataUtils.newScalar(col.name(),
defn.isOptional() ? OPTIONAL_INTERNAL_COL_TYPE : IMPLICIT_COL_TYPE),
new InternalColumnMarker(defn));
}
}
public static final MajorType IMPLICIT_COL_TYPE = Types.required(MinorType.VARCHAR);
public static final MajorType PARTITION_COL_TYPE = Types.optional(MinorType.VARCHAR);
public static final MajorType OPTIONAL_INTERNAL_COL_TYPE = Types.optional(MinorType.VARCHAR);
private final int maxPartitionDepth;
private final boolean useLegacyWildcardExpansion;
private final String partitionDesignator;
private final Pattern partitionPattern;
private final Pattern partitionTypePattern;
private final Map<String, ImplicitFileColumn> colDefs = CaseInsensitiveMap.newHashMap();
private final Map<String, ImplicitFileColumn> typeDefs = CaseInsensitiveMap.newHashMap();
private final CustomErrorContext errorContext;
private final DrillFileSystem dfs;
public ImplicitColumnResolver(ImplicitColumnOptions options, CustomErrorContext errorContext) {
this.errorContext = errorContext;
this.maxPartitionDepth = options.maxPartitionDepth;
this.useLegacyWildcardExpansion = options.useLegacyWildcardExpansion;
this.dfs = options.dfs;
this.partitionDesignator = options.optionSet.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
this.partitionPattern = Pattern.compile(partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE);
if (partitionDesignator.equals(ColumnMetadata.IMPLICIT_PARTITION_PREFIX)) {
this.partitionTypePattern = partitionPattern;
} else {
this.partitionTypePattern = Pattern.compile(ColumnMetadata.IMPLICIT_PARTITION_PREFIX + "(\\d+)",
Pattern.CASE_INSENSITIVE);
}
// File implicit columns: can be defined in the provided schema
for (ImplicitFileColumns defn : ImplicitFileColumns.values()) {
String colName = options.optionSet.getString(defn.optionName());
if (!Strings.isNullOrEmpty(colName)) {
this.colDefs.put(colName, defn);
}
this.typeDefs.put(defn.propertyValue(), defn);
}
// Internal implicit cols: cannot be defined in the provided schema
for (ImplicitInternalFileColumns defn : ImplicitInternalFileColumns.values()) {
String colName = options.optionSet.getString(defn.optionName());
if (!Strings.isNullOrEmpty(colName)) {
this.colDefs.put(colName, defn);
}
}
}
public ParseResult parse(ScanSchemaTracker tracker) {
return new ImplicitColumnParser(this, tracker).parse();
}
public String partitionName(int partition) {
return partitionDesignator + partition;
}
}