DRILL-7725: Updates to the EVF2 framework
* Supports internal implicit columns
* Better support for standard conversions
* Handle several reader corner cases
* Simplified file reader
closes #2073
diff --git a/common/src/test/java/org/apache/drill/categories/EvfTests.java b/common/src/test/java/org/apache/drill/categories/EvfTest.java
similarity index 97%
rename from common/src/test/java/org/apache/drill/categories/EvfTests.java
rename to common/src/test/java/org/apache/drill/categories/EvfTest.java
index 7bc771e..e2fc1a7 100644
--- a/common/src/test/java/org/apache/drill/categories/EvfTests.java
+++ b/common/src/test/java/org/apache/drill/categories/EvfTest.java
@@ -21,6 +21,6 @@
* A category for tests that test the "Extended Vector Framework"
* (EVF): the mechanism that drives the plugin-based scan operator.
*/
-public interface EvfTests {
+public interface EvfTest {
// Junit category marker
}
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java
index 6d45e06..6690df8 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java
@@ -44,7 +44,9 @@
this.tupleWriter = tupleWriter;
this.providedSchema = providedSchema;
this.errorContext = errorContext;
- this.conversions = new StandardConversions(providedSchema);
+ this.conversions = StandardConversions.builder()
+ .withSchema(providedSchema)
+ .build();
}
public ValueWriter makeWriter(String name, MinorType type, DataMode mode) {
@@ -56,7 +58,7 @@
return tupleWriter.scalar(index);
} else {
int index = tupleWriter.addColumn(providedCol);
- return conversions.converter(tupleWriter.scalar(index), type);
+ return conversions.converterFor(tupleWriter.scalar(index), type);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index b6c77ca..699eb1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -263,7 +263,7 @@
}
// Another reader available?
- if (! nextReader()) {
+ if (!nextReader()) {
finalizeResults();
return;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
index 17b3f68..842c798 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
@@ -110,5 +110,6 @@
public static TupleMetadata columnsSchema() {
return new SchemaBuilder()
.addArray(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
- .buildSchema(); }
+ .buildSchema();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java
index f31ee69..76d2616 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java
@@ -95,10 +95,10 @@
* @param properties optional framework-specific properties
* @return a function to call to prepare each string value for conversion
*/
- private Function<String,String> buildPrepare(ColumnMetadata schema,
+ private Function<String, String> buildPrepare(ColumnMetadata schema,
Map<String, String> properties) {
- String blankProp = properties.get(ColumnMetadata.BLANK_AS_PROP);
+ String blankProp = properties == null ? null : properties.get(ColumnMetadata.BLANK_AS_PROP);
if (blankProp != null) {
switch (blankProp.toLowerCase()) {
case ColumnMetadata.BLANK_AS_ZERO:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
index 7abaac5..396e741 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
@@ -24,21 +24,13 @@
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.ValueWriter;
/**
* Factory for standard conversions as outlined in the package header.
- * Can be used in two ways:
- * <ul>
- * <li>As an object with a default set of properties that apply to
- * all columns.</li>
- * <li>As a set of static functions which operate on each column
- * individually.</li>
- * </ul>
- * The object form is more typical: it allows the provided schema to
- * set general blank-handling behavior at the tuple level.
+ * Use the builder to add schema-wide properties from a provided schema,
+ * from context-specific properties or both.
* <p>
* The class provides two kinds of information:
* <p>
@@ -111,6 +103,42 @@
}
}
+ public static class Builder {
+
+ private final Map<String, String> properties = new HashMap<>();
+
+ public Builder withSchema(TupleMetadata providedSchema) {
+ if (providedSchema != null && providedSchema.hasProperties()) {
+ properties.putAll(providedSchema.properties());
+ }
+ return this;
+ }
+
+ public Builder withProperties(Map<String, String> properties) {
+ if (properties != null) {
+ this.properties.putAll(properties);
+ }
+ return this;
+ }
+
+ public Builder property(String key, String value) {
+ if (value == null) {
+ properties.remove(key);
+ } else {
+ properties.put(key, value);
+ }
+ return this;
+ }
+
+ public Builder blankAs(String value) {
+ return property(ColumnMetadata.BLANK_AS_PROP, value);
+ }
+
+ public StandardConversions build() {
+ return new StandardConversions(properties);
+ }
+ }
+
public static final ConversionDefn IMPLICIT =
new ConversionDefn(ConversionType.IMPLICIT);
public static final ConversionDefn IMPLICIT_UNSAFE =
@@ -120,32 +148,14 @@
private final Map<String, String> properties;
- public StandardConversions() {
- this.properties = null;
- }
-
- public StandardConversions(Map<String,String> properties) {
+ private StandardConversions(Map<String, String> properties) {
this.properties = properties;
}
- public StandardConversions(String blankAsProp) {
- if (blankAsProp == null) {
- this.properties = null;
- } else {
- this.properties = new HashMap<>();
- this.properties.put(ColumnMetadata.BLANK_AS_PROP, blankAsProp);
- }
- }
+ public static Builder builder() { return new Builder(); }
- public StandardConversions(TupleMetadata providedSchema) {
- if (providedSchema == null || !providedSchema.hasProperties()) {
- this.properties = null;
- } else {
- this.properties = providedSchema.properties();
- }
- }
-
- private Map<String,String> merge(Map<String,String> specificProps) {
+ private static Map<String, String> mergeProperties(Map<String, String> properties,
+ Map<String, String> specificProps) {
if (properties == null) {
return specificProps;
} else if (specificProps == null) {
@@ -157,15 +167,19 @@
return merged;
}
- public static DirectConverter newInstance(
+ private Map<String, String> mergeProperties(Map<String, String> specificProps) {
+ return mergeProperties(properties, specificProps);
+ }
+
+ public DirectConverter newInstance(
Class<? extends DirectConverter> conversionClass, ScalarWriter baseWriter,
- Map<String,String> properties) {
+ Map<String, String> properties) {
// Try the Converter(ScalerWriter writer, Map<String, String> props) constructor first.
// This first form is optional.
try {
final Constructor<? extends DirectConverter> ctor = conversionClass.getDeclaredConstructor(ScalarWriter.class, Map.class);
- return ctor.newInstance(baseWriter, properties);
+ return ctor.newInstance(baseWriter, mergeProperties(properties));
} catch (final ReflectiveOperationException e) {
// Ignore
}
@@ -174,7 +188,7 @@
return newInstance(conversionClass, baseWriter);
}
- public static DirectConverter newInstance(
+ public DirectConverter newInstance(
Class<? extends DirectConverter> conversionClass, ScalarWriter baseWriter) {
try {
final Constructor<? extends DirectConverter> ctor = conversionClass.getDeclaredConstructor(ScalarWriter.class);
@@ -196,11 +210,11 @@
* @return a description of the conversion needed (if any), along with the
* standard conversion class, if available
*/
- public static ConversionDefn analyze(ColumnMetadata inputSchema, ColumnMetadata outputSchema) {
+ public ConversionDefn analyze(ColumnMetadata inputSchema, ColumnMetadata outputSchema) {
return analyze(inputSchema.type(), outputSchema);
}
- public static ConversionDefn analyze(MinorType inputType, ColumnMetadata outputSchema) {
+ public ConversionDefn analyze(MinorType inputType, ColumnMetadata outputSchema) {
if (inputType == outputSchema.type()) {
return new ConversionDefn(ConversionType.NONE);
}
@@ -364,8 +378,7 @@
return EXPLICIT;
}
- public static Class<? extends DirectConverter> convertFromVarchar(
- ColumnMetadata outputDefn) {
+ public Class<? extends DirectConverter> convertFromVarchar(ColumnMetadata outputDefn) {
switch (outputDefn.type()) {
case BIT:
return ConvertStringToBoolean.class;
@@ -404,75 +417,33 @@
*
* @param scalarWriter the output column writer
* @param inputType the type of the input data
- * @param properties optional properties for some string-based conversions
+ * @param columnProps optional properties for some string-based conversions
* @return a column converter, if needed and available, the input writer if
* no conversion is needed, or null if there is no conversion available
*/
- public static ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType, Map<String, String> properties) {
+ public ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType, Map<String, String> columnProps) {
ConversionDefn defn = analyze(inputType, scalarWriter.schema());
switch (defn.type) {
case EXPLICIT:
if (defn.conversionClass != null) {
- return newInstance(defn.conversionClass, scalarWriter, properties);
+ return newInstance(defn.conversionClass, scalarWriter, columnProps);
} else {
return null;
}
case IMPLICIT:
case IMPLICIT_UNSAFE:
- return newInstance(defn.conversionClass, scalarWriter, properties);
+ return newInstance(defn.conversionClass, scalarWriter, columnProps);
default:
return scalarWriter;
}
}
- public static ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType) {
+ public ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType) {
return converterFor(scalarWriter, inputType, null);
}
- public static ValueWriter converterFor(ScalarWriter scalarWriter, ColumnMetadata inputSchema) {
- return converterFor(scalarWriter, inputSchema.type(), inputSchema.properties());
- }
-
- public ValueWriter converter(ScalarWriter scalarWriter, ColumnMetadata inputSchema) {
+ public ValueWriter converterFor(ScalarWriter scalarWriter, ColumnMetadata inputSchema) {
return converterFor(scalarWriter, inputSchema.type(),
- merge(inputSchema.properties()));
- }
-
- public ValueWriter converter(ScalarWriter scalarWriter, MinorType inputType) {
- return converterFor(scalarWriter, inputType, properties);
- }
-
- /**
- * Given a desired provided schema and an actual reader schema, create a merged
- * schema that contains the provided column where available, but the reader
- * column otherwise. Copies provided properties to the output schema.
- * <p>
- * The result is the schema to use when creating column writers: it reflects
- * the type of the target vector. The reader is responsible for converting from
- * the (possibly different) reader column type to the provided column type.
- * <p>
- * Note: the provided schema should only contain types that the reader is prepared
- * to offer: there is no requirement that the reader support every possible conversion,
- * only those that make sense for that one reader.
- *
- * @param providedSchema the provided schema from {@code CREATE SCHEMA}
- * @param readerSchema the set of column types that the reader can provide
- * "natively"
- * @return a merged schema to use when creating the {@code ResultSetLoader}
- */
- public static TupleMetadata mergeSchemas(TupleMetadata providedSchema,
- TupleMetadata readerSchema) {
- if (providedSchema == null) {
- return readerSchema;
- }
- final TupleMetadata tableSchema = new TupleSchema();
- for (ColumnMetadata readerCol : readerSchema) {
- final ColumnMetadata providedCol = providedSchema.metadata(readerCol.name());
- tableSchema.addColumn(providedCol == null ? readerCol : providedCol);
- }
- if (providedSchema.hasProperties()) {
- tableSchema.properties().putAll(providedSchema.properties());
- }
- return tableSchema;
+ inputSchema.hasProperties() ? inputSchema.properties() : null);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/FixedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/FixedReceiver.java
new file mode 100644
index 0000000..923214a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/FixedReceiver.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleNameSpace;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Layer above the {@code ResultSetLoader} which handles standard conversions
+ * for scalar columns where the schema is known up front (i.e. "early schema".)
+ * Columns access is by both name and position, though access by position is
+ * faster and is preferred where possible for performance.
+ */
+public class FixedReceiver {
+ private static final Logger logger = LoggerFactory.getLogger(FixedReceiver.class);
+
+ public static class Builder {
+ private final SchemaNegotiator negotiator;
+ private final TupleMetadata providedSchema;
+ private final StandardConversions.Builder conversionBuilder = StandardConversions.builder();
+ private boolean isComplete;
+ private RowSetLoader rowWriter;
+
+ public Builder(SchemaNegotiator negotiator) {
+ this.negotiator = negotiator;
+ this.providedSchema = negotiator.providedSchema();
+ }
+
+ /**
+ * Provides access to the conversion builder to add custom properties.
+ */
+ public StandardConversions.Builder conversionBuilder() {
+ return conversionBuilder;
+ }
+
+ /**
+ * Mark that the reader schema provided to {@link #build(TupleMetadata)}
+ * contains all columns that this reader will deliver. Allows some
+ * optimizations. See {@link SchemaNegotiator#schemaIsComplete(boolean)}.
+ */
+ public Builder schemaIsComplete() {
+ isComplete = true;
+ return this;
+ }
+
+ /**
+ * Create a fixed receiver for the provided schema (if any) in the
+ * scan plan, and the given reader schema. Assumes no new columns will
+ * be added later in the read.
+ */
+ public FixedReceiver build(TupleMetadata readerSchema) {
+ StandardConversions conversions = conversionBuilder.build();
+ TupleMetadata writerSchema = mergeSchemas(negotiator.providedSchema(), readerSchema);
+ negotiator.tableSchema(writerSchema);
+ negotiator.schemaIsComplete(isComplete);
+ ResultSetLoader loader = negotiator.build();
+ rowWriter = loader.writer();
+ TupleNameSpace<ValueWriter> writers = new TupleNameSpace<>();
+ for (ColumnMetadata col : readerSchema) {
+ writers.add(col.name(), writerFor(col, conversions));
+ }
+ return new FixedReceiver(rowWriter, writers);
+ }
+
+ /**
+ * Given a desired provided schema and an actual reader schema, create a merged
+ * schema that contains the provided column where available, but the reader
+ * column otherwise. Copies provided properties to the output schema.
+ * <p>
+ * The result is the schema to use when creating column writers: it reflects
+ * the type of the target vector. The reader is responsible for converting from
+ * the (possibly different) reader column type to the provided column type.
+ * <p>
+ * Note: the provided schema should only contain types that the reader is prepared
+ * to offer: there is no requirement that the reader support every possible conversion,
+ * only those that make sense for that one reader.
+ *
+ * @param providedSchema the provided schema from {@code CREATE SCHEMA}
+ * @param readerSchema the set of column types that the reader can provide
+ * "natively"
+ * @return a merged schema to use when creating the {@code ResultSetLoader}
+ */
+ public static TupleMetadata mergeSchemas(TupleMetadata providedSchema,
+ TupleMetadata readerSchema) {
+ if (providedSchema == null) {
+ return readerSchema;
+ }
+ final TupleMetadata tableSchema = new TupleSchema();
+ for (ColumnMetadata readerCol : readerSchema) {
+ final ColumnMetadata providedCol = providedSchema.metadata(readerCol.name());
+ tableSchema.addColumn(providedCol == null ? readerCol : providedCol);
+ }
+ if (providedSchema.hasProperties()) {
+ tableSchema.properties().putAll(providedSchema.properties());
+ }
+ return tableSchema;
+ }
+
+ private ValueWriter writerFor(ColumnMetadata readerCol, StandardConversions conversions) {
+ if (!MetadataUtils.isScalar(readerCol)) {
+ throw UserException.internalError()
+ .message("FixedReceiver only works with scalar columns, reader column is not scalar")
+ .addContext("Column name", readerCol.name())
+ .addContext("Column type", readerCol.type().name())
+ .addContext(errorContext())
+ .build(logger);
+ }
+ ScalarWriter baseWriter = rowWriter.scalar(readerCol.name());
+ if (!rowWriter.isProjected(readerCol.name())) {
+ return baseWriter;
+ }
+ ColumnMetadata providedCol = providedCol(readerCol.name());
+ if (providedCol == null) {
+ return baseWriter;
+ }
+ if (!MetadataUtils.isScalar(providedCol)) {
+ throw UserException.validationError()
+ .message("FixedReceiver only works with scalar columns, provided column is not scalar")
+ .addContext("Provided column name", providedCol.name())
+ .addContext("Provided column type", providedCol.type().name())
+ .addContext(errorContext())
+ .build(logger);
+ }
+ if (!compatibleModes(readerCol.mode(), providedCol.mode())) {
+ throw UserException.validationError()
+ .message("Reader and provided columns have incompatible cardinality")
+ .addContext("Column name", providedCol.name())
+ .addContext("Provided column mode", providedCol.mode().name())
+ .addContext("Reader column mode", readerCol.mode().name())
+ .addContext(errorContext())
+ .build(logger);
+ }
+ return conversions.converterFor(baseWriter, readerCol.type());
+ }
+
+ private boolean compatibleModes(DataMode source, DataMode dest) {
+ return source == dest ||
+ dest == DataMode.OPTIONAL && source == DataMode.REQUIRED;
+ }
+
+ private ColumnMetadata providedCol(String name) {
+ return providedSchema == null ? null : providedSchema.metadata(name);
+ }
+
+ private CustomErrorContext errorContext( ) {
+ return negotiator.errorContext();
+ }
+ }
+
+ private final RowSetLoader rowWriter;
+ private final TupleNameSpace<ValueWriter> writers;
+
+ private FixedReceiver(RowSetLoader rowWriter,
+ TupleNameSpace<ValueWriter> writers) {
+ this.rowWriter = rowWriter;
+ this.writers = writers;
+ }
+
+ public static Builder builderFor(SchemaNegotiator negotiator) {
+ return new Builder(negotiator);
+ }
+
+ public boolean start() {
+ return rowWriter.start();
+ }
+
+ public ValueWriter scalar(int index) {
+ return writers.get(index);
+ }
+
+ public ValueWriter scalar(String name) {
+ return writers.get(name);
+ }
+
+ public void save() {
+ rowWriter.save();
+ }
+
+ public RowSetLoader rowWriter() { return rowWriter; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
index f4393f1..393fc23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
@@ -24,12 +24,17 @@
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.protocol.OperatorDriver;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanEventListener;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanLifecycle;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
@@ -80,6 +85,11 @@
}
}
+ public interface SchemaValidator {
+ void validate(ScanSchemaTracker schema);
+ }
+
+ private OptionSet options;
private ReaderFactory<?> readerFactory;
protected String userName;
protected MajorType nullType;
@@ -133,10 +143,24 @@
protected boolean allowSchemaChange = true;
/**
+ * Optional schema validator to perform per-scan checks of the
+ * projection or resolved schema.
+ */
+ protected SchemaValidator schemaValidator;
+
+ /**
* Context for error messages.
*/
protected CustomErrorContext errorContext;
+ public void options(OptionSet options) {
+ this.options = options;
+ }
+
+ public OptionSet options() {
+ return options;
+ }
+
public void readerFactory(ReaderFactory<?> readerFactory) {
this.readerFactory = readerFactory;
}
@@ -258,6 +282,12 @@
return readerFactory;
}
+ public void schemaValidator(SchemaValidator schemaValidator) {
+ this.schemaValidator = schemaValidator;
+ }
+
+ public SchemaValidator schemaValidator() { return schemaValidator; }
+
public ScanLifecycle build(OperatorContext context) {
return new ScanLifecycle(context, this);
}
@@ -266,6 +296,10 @@
public ScanOperatorExec buildScan() {
return new ScanOperatorExec(
new ScanEventListener(this),
- !disableEmptyResults);
+ !disableEmptyResults);
+ }
+
+ public OperatorRecordBatch buildScanOperator(FragmentContext fragContext, PhysicalOperator pop) {
+ return new OperatorRecordBatch(fragContext, pop, buildScan(), enableSchemaBatch);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java
index 01d9b4c..9dee1d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java
@@ -19,6 +19,7 @@
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -116,6 +117,13 @@
void setErrorContext(CustomErrorContext context);
/**
+ * Returns the error context to use for this reader: either the
+ * parent or the reader-specific context set in
+ * {@link #setErrorContext(CustomErrorContext)}.
+ */
+ CustomErrorContext errorContext();
+
+ /**
* Name of the user running the query.
*/
String userName();
@@ -134,6 +142,8 @@
*/
boolean isProjectionEmpty();
+ ProjectedColumn projectionFor(String colName);
+
/**
* Returns the provided schema, if defined. The provided schema is a
* description of the source schema viewed as a Drill schema.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java
index bb92772..c6fff11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java
@@ -17,10 +17,18 @@
*/
package org.apache.drill.exec.physical.impl.scan.v3.file;
+import java.io.IOException;
+import java.io.InputStream;
+
import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
/**
+ * Describes one file within a scan and is used to populate implicit columns.
* Specify the file name and optional selection root. If the selection root
* is provided, then partitions are defined as the portion of the file name
* that is not also part of the selection root. That is, if selection root is
@@ -28,13 +36,35 @@
*/
public class FileDescrip {
- private final Path filePath;
+ private final DrillFileSystem dfs;
+ private final FileWork fileWork;
+ private final FileSplit split;
private final String[] dirPath;
- public FileDescrip(Path filePath, Path selectionRoot) {
- this.filePath = filePath;
+ // Option to open the file as optionally compressed
+ private boolean isCompressible;
+
+ // Parquet-related attributes
+ protected Integer rowGroupIndex;
+ protected Long rowGroupStart;
+ protected Long rowGroupLength;
+
+ // Cached modification time. Cached as a string because
+ // that's the odd way we return the value.
+ private String modTime;
+
+ // Flag to indicate that the file turned out to be empty.
+ // Used to set one of the internal implicit columns.
+ protected boolean isEmpty;
+
+ public FileDescrip(DrillFileSystem dfs, FileWork fileWork, Path selectionRoot) {
+ this.dfs = dfs;
+ this.fileWork = fileWork;
+ Path path = dfs.makeQualified(fileWork.getPath());
+ this.split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
// If the data source is not a file, no file metadata is available.
+ Path filePath = fileWork.getPath();
if (selectionRoot == null || filePath == null) {
dirPath = null;
return;
@@ -57,7 +87,25 @@
}
}
- public Path filePath() { return filePath; }
+ /**
+ * Gives the Drill file system for this operator.
+ */
+ public DrillFileSystem fileSystem() { return dfs; }
+
+ /**
+ * Returns Drill's version of the Hadoop file split.
+ */
+ public Path filePath() { return fileWork.getPath(); }
+
+ /**
+ * Describes the file split (path and block offset) for this scan.
+ *
+ * @return Hadoop file split object with the file path, block
+ * offset, and length.
+ */
+ public FileSplit split() { return split; }
+
+ public FileWork fileWork() { return fileWork; }
public String partition(int index) {
if (dirPath == null || dirPath.length <= index) {
@@ -70,5 +118,49 @@
return dirPath == null ? 0 : dirPath.length;
}
- public boolean isSet() { return filePath != null; }
+ public void setRowGroupAttribs(int index, long start, long length) {
+ this.rowGroupIndex = index;
+ this.rowGroupStart = start;
+ this.rowGroupLength = length;
+ }
+
+ public String getModTime() {
+ if (modTime == null) {
+ try {
+ modTime = String.valueOf(dfs.getFileStatus(filePath()).getModificationTime());
+ } catch (IOException e) {
+
+ // This is an odd place to catch and report errors. Assume that, if the file
+ // has problems, the call to open the file will fail and will return a better
+ // error message than we can provide here.
+ }
+ }
+ return modTime;
+ }
+
+ /**
+ * Explicitly set the cached modification time. For testing only.
+ */
+ @VisibleForTesting
+ public void setModTime(String modTime) {
+ this.modTime = modTime;
+ }
+
+ public void setCompressible(boolean isCompressed) {
+ this.isCompressible = isCompressed;
+ }
+
+ public boolean isCompressible() { return isCompressible; }
+
+ public InputStream open() throws IOException {
+ if (isCompressible) {
+ return dfs.openPossiblyCompressedStream(filePath());
+ } else {
+ return dfs.open(filePath());
+ }
+ }
+
+ public void markEmpty() {
+ isEmpty = true;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java
index eb4fc07..0db2cdc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java
@@ -77,9 +77,10 @@
// Create the implicit columns manager
this.implicitColumnsHandler = new ImplicitFileColumnsHandler(
- context, options, vectorCache(), schemaTracker());
+ dfs, context.getFragmentContext().getOptions(),
+ options, vectorCache(), schemaTracker());
- // Bind the reader factory which intializes the list
+ // Bind the reader factory which initializes the list
// of splits from the builder.
FileReaderFactory readerFactory = (FileReaderFactory) readerFactory();
readerFactory.bind(this);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java
index a791a35..85f9dbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java
@@ -17,22 +17,25 @@
*/
package org.apache.drill.exec.physical.impl.scan.v3.file;
+import java.util.ArrayList;
import java.util.List;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanLifecycle;
import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
public class FileScanLifecycleBuilder extends ScanLifecycleBuilder {
protected int maxPartitionDepth;
- protected boolean useLegacyWildcardExpansion;
+ protected boolean useLegacyWildcardExpansion = true;
protected Path rootDir;
private List<FileWork> splits;
private Configuration fsConf;
+ private boolean compressible;
public void fileSystemConfig(Configuration fsConf) {
this.fsConf = fsConf;
@@ -42,6 +45,14 @@
this.splits = splits;
}
+ /**
+ * Legacy version because the file scan operator exposes the
+ * implementation, not the interface.
+ */
+ public void fileSplitImpls(List<FileWorkImpl> splits) {
+ this.splits = new ArrayList<>(splits);
+ }
+
public void maxPartitionDepth(int maxPartitionDepth) {
this.maxPartitionDepth = maxPartitionDepth;
}
@@ -54,6 +65,10 @@
this.rootDir = rootDir;
}
+ public void compressible(boolean compressible) {
+ this.compressible = compressible;
+ }
+
public List<FileWork> splits() {
return Preconditions.checkNotNull(splits);
}
@@ -65,20 +80,16 @@
return fsConf;
}
+ public int maxPartitionDepth() { return maxPartitionDepth; }
+
+ public boolean useLegacyWildcardExpansion() { return useLegacyWildcardExpansion; }
+
+ public Path rootDir() { return rootDir; }
+
+ public boolean isCompressible() { return compressible; }
+
@Override
public ScanLifecycle build(OperatorContext context) {
return new FileScanLifecycle(context, this);
}
-
- public int maxPartitionDepth() {
- return maxPartitionDepth;
- }
-
- public boolean useLegacyWildcardExpansion() {
- return useLegacyWildcardExpansion;
- }
-
- public Path rootDir() {
- return rootDir;
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java
index 156b5e4..86dd2b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java
@@ -18,9 +18,6 @@
package org.apache.drill.exec.physical.impl.scan.v3.file;
import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.hadoop.mapred.FileSplit;
/**
* The file schema negotiator provides access to the Drill file system
@@ -29,20 +26,9 @@
public interface FileSchemaNegotiator extends SchemaNegotiator {
/**
- * Gives the Drill file system for this operator.
+ * Gives the file description which holds the Drill file system,
+ * split, file work and format-specific properties. Can open the
+ * file and provides information used to populate implicit columns.
*/
- DrillFileSystem fileSystem();
-
- /**
- * Describes the file split (path and block offset) for this scan.
- *
- * @return Hadoop file split object with the file path, block
- * offset, and length.
- */
- FileSplit split();
-
- /**
- * Returns Drill's version of the Hadoop file split.
- */
- FileWork fileWork();
+ FileDescrip file();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java
index 8457b2f..9d69a97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java
@@ -21,14 +21,13 @@
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException.Builder;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
-import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ReaderLifecycle;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.SchemaNegotiatorImpl;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
/**
@@ -49,7 +48,8 @@
@Override
public void addContext(Builder builder) {
super.addContext(builder);
- builder.addContext("File", Path.getPathWithoutSchemeAndAuthority(split.getPath()).toString());
+ FileSplit split = fileDescrip.split();
+ builder.addContext("File", split.getPath().toString());
if (split.getStart() != 0) {
builder.addContext("Offset", split.getStart());
builder.addContext("Length", split.getLength());
@@ -57,8 +57,7 @@
}
}
- private FileWork fileWork;
- private FileSplit split;
+ private FileDescrip fileDescrip;
public FileSchemaNegotiatorImpl(ReaderLifecycle readerLifecycle) {
super(readerLifecycle);
@@ -67,19 +66,11 @@
}
public void bindSplit(FileWork fileWork) {
- this.fileWork = fileWork;
- Path path = fileScan().fileSystem().makeQualified(fileWork.getPath());
- split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
+ fileDescrip = fileScan().implicitColumnsHandler().makeDescrip(fileWork);
}
@Override
- public DrillFileSystem fileSystem() { return fileScan().fileSystem(); }
-
- @Override
- public FileSplit split() { return split; }
-
- @Override
- public FileWork fileWork() { return fileWork; }
+ public FileDescrip file() { return fileDescrip; }
@Override
@SuppressWarnings("unchecked")
@@ -89,10 +80,32 @@
@Override
public StaticBatchBuilder implicitColumnsLoader() {
- return fileScan().implicitColumnsHandler().forFile(split.getPath());
+ return fileScan().implicitColumnsHandler().forFile(fileDescrip);
}
private FileScanLifecycle fileScan() {
return (FileScanLifecycle) readerLifecycle.scanLifecycle();
}
+
+ @Override
+ protected void onEndBatch() {
+
+ // If this is is a metadata scan, and this file has no rows (this is
+ // the first batch and contains no data), then add a dummy row so
+ // we have something to aggregate upon.
+ ImplicitFileColumnsHandler handler = fileScan().implicitColumnsHandler();
+ if (!handler.isMetadataScan()) {
+ return;
+ }
+ ResultSetLoader tableLoader = readerLifecycle.tableLoader();
+ if (tableLoader.batchCount() == 0 && !tableLoader.hasRows()) {
+
+ // This is admittedly a hack. The table may contain non-nullable
+ // columns, but we are asking for null values for those columns.
+ // We'll fill in defaults, with is not ideal.
+ tableLoader.writer().start();
+ tableLoader.writer().save();
+ fileDescrip.markEmpty();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnMarker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnMarker.java
new file mode 100644
index 0000000..e8c9790
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnMarker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3.file;
+
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitInternalFileColumns;
+
+/**
+ * Marks a column as implicit and provides a function to resolve an
+ * implicit column given a description of the input file.
+ */
+public interface ImplicitColumnMarker {
+ String resolve(FileDescrip fileInfo);
+
+ /**
+ * Marker for a file-based, non-internal implicit column that
+ * extracts parts of the file name as defined by the implicit
+ * column definition.
+ */
+ public class FileImplicitMarker implements ImplicitColumnMarker {
+ public final ImplicitFileColumns defn;
+
+ public FileImplicitMarker(ImplicitFileColumns defn) {
+ this.defn = defn;
+ }
+
+ @Override
+ public String resolve(FileDescrip fileInfo) {
+ return defn.getValue(fileInfo.filePath());
+ }
+ }
+
+ /**
+ * Partition column defined by a partition depth from the scan
+ * root folder. Partitions that reference non-existent directory levels
+ * are null.
+ */
+ public class PartitionColumnMarker implements ImplicitColumnMarker {
+ private final int partition;
+
+ public PartitionColumnMarker(int partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public String resolve(FileDescrip fileInfo) {
+ return fileInfo.partition(partition);
+ }
+ }
+
+ public class InternalColumnMarker implements ImplicitColumnMarker {
+ public final ImplicitInternalFileColumns defn;
+
+ public InternalColumnMarker(ImplicitInternalFileColumns defn) {
+ this.defn = defn;
+ }
+
+ @Override
+ public String resolve(FileDescrip fileInfo) {
+ switch (defn) {
+ case ROW_GROUP_INDEX:
+ return valueOf(fileInfo.rowGroupIndex);
+ case ROW_GROUP_START:
+ return valueOf(fileInfo.rowGroupStart);
+ case ROW_GROUP_LENGTH:
+ return valueOf(fileInfo.rowGroupLength);
+ case PROJECT_METADATA:
+ case USE_METADATA:
+
+ // Per Metadata code: if file is empty (and record is a placeholder)
+ // return "false", else return null for valid rows.
+ return fileInfo.isEmpty ? Boolean.FALSE.toString() : null;
+ case LAST_MODIFIED_TIME:
+ return fileInfo.getModTime();
+ default:
+ throw new IllegalStateException(defn.name());
+ }
+ }
+
+ private String valueOf(Object value) {
+ return value == null ? null : String.valueOf(value);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ImplicitColumnResolver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnResolver.java
similarity index 71%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ImplicitColumnResolver.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnResolver.java
index fcc3ed8..3ec7f35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ImplicitColumnResolver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnResolver.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.physical.impl.scan.v3.schema;
+package org.apache.drill.exec.physical.impl.scan.v3.file;
import java.util.ArrayList;
import java.util.HashSet;
@@ -33,13 +33,22 @@
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.FileImplicitMarker;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.InternalColumnMarker;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.PartitionColumnMarker;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema;
import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema.ColumnHandle;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumn;
import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitInternalFileColumns;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +92,8 @@
*/
protected boolean useLegacyWildcardExpansion = true;
+ protected DrillFileSystem dfs;
+
public ImplicitColumnOptions optionSet(OptionSet optionSet) {
this.optionSet = optionSet;
return this;
@@ -111,47 +122,10 @@
useLegacyWildcardExpansion = flag;
return this;
}
- }
- /**
- * Provides a function to resolve an implicit column given a description
- * of the input file.
- */
- public interface ColumnMarker {
- String resolve(FileDescrip fileInfo);
- }
-
- /**
- * Implicit column defined by an {@link ImplicitFileColumns}.
- */
- public static class ImplicitColumnMarker implements ColumnMarker {
- private final ImplicitFileColumns defn;
-
- public ImplicitColumnMarker(ImplicitFileColumns defn) {
- this.defn = defn;
- }
-
- @Override
- public String resolve(FileDescrip fileInfo) {
- return defn.getValue(fileInfo.filePath());
- }
- }
-
- /**
- * Partition column defined by a partition depth from the scan
- * root folder. Partitions that reference non-existent directory levels
- * are null.
- */
- public static class PartitionColumnMarker implements ColumnMarker {
- private final int partition;
-
- private PartitionColumnMarker(int partition) {
- this.partition = partition;
- }
-
- @Override
- public String resolve(FileDescrip fileInfo) {
- return fileInfo.partition(partition);
+ public ImplicitColumnOptions dfs(DrillFileSystem dfs) {
+ this.dfs = dfs;
+ return this;
}
}
@@ -161,16 +135,20 @@
* column markers which resolve the columns for each file.
*/
public static class ParseResult {
- private final List<ColumnMarker> columns;
+ private final List<ImplicitColumnMarker> columns;
private final TupleMetadata schema;
+ private final boolean isMetadataScan;
- protected ParseResult(List<ColumnMarker> columns, TupleMetadata schema) {
+ protected ParseResult(List<ImplicitColumnMarker> columns, TupleMetadata schema,
+ boolean isMetadataScan) {
this.columns = columns;
this.schema = schema;
+ this.isMetadataScan = isMetadataScan;
}
public TupleMetadata schema() { return schema; }
- public List<ColumnMarker> columns() { return columns; }
+ public List<ImplicitColumnMarker> columns() { return columns; }
+ public boolean isMetadataScan() { return isMetadataScan; }
public Object[] resolve(FileDescrip fileInfo) {
Object values[] = new Object[columns.size()];
@@ -185,8 +163,9 @@
private final ImplicitColumnResolver parser;
private final ScanSchemaTracker tracker;
private final MutableTupleSchema scanSchema;
- private final List<ColumnMarker> columns = new ArrayList<>();
+ private final List<ImplicitColumnMarker> columns = new ArrayList<>();
private final Set<Integer> referencedPartitions = new HashSet<>();
+ private boolean isMetadataScan;
protected ImplicitColumnParser(ImplicitColumnResolver parser, ScanSchemaTracker tracker) {
this.parser = parser;
@@ -196,7 +175,7 @@
protected ParseResult parse() {
for (ColumnHandle col : tracker.internalSchema().columns()) {
- matchColumn(parser, col);
+ matchColumn(col);
}
if (tracker.internalSchema().projectionType() == ScanSchemaTracker.ProjectionType.ALL) {
expandWildcard();
@@ -207,7 +186,7 @@
// appears out-of-order:
// SELECT *, fileName
// SELECT fileName, *
- return new ParseResult(columns, tracker.applyImplicitCols());
+ return new ParseResult(columns, tracker.applyImplicitCols(), isMetadataScan);
}
private void expandWildcard() {
@@ -224,21 +203,21 @@
if (referencedPartitions.contains(i)) {
continue;
}
+ ImplicitColumnMarker marker = new PartitionColumnMarker(i);
ColumnMetadata resolved = MetadataUtils.newScalar(parser.partitionName(i), PARTITION_COL_TYPE);
- SchemaUtils.markAsPartition(resolved, i);
- columns.add(new PartitionColumnMarker(i));
- tracker.expandImplicitCol(resolved);
+ columns.add(marker);
+ tracker.expandImplicitCol(resolved, marker);
referencedPartitions.add(i);
}
}
- private void matchColumn(ImplicitColumnResolver parser, ColumnHandle col) {
+ private void matchColumn(ColumnHandle col) {
String colType = SchemaUtils.implicitColType(col.column());
if (colType != null) {
resolveTaggedColumn(parser, col, colType);
return;
} else if (col.column().isDynamic()) {
- matchByName(parser, col);
+ matchByName(col);
}
}
@@ -250,22 +229,23 @@
return;
}
- ImplicitFileColumns defn = parser.typeDefs.get(colType);
+ ImplicitFileColumn defn = parser.typeDefs.get(colType);
if (defn != null) {
- resolveImplicitColumn(defn, parser, col);
+ resolveImplicitColumn((ImplicitFileColumns) defn, col, colType);
return;
}
- resolveUnknownColumn(parser, col, colType);
+ resolveUnknownColumn(col, colType);
}
- private void resolvePartitionColumn(Matcher m, ImplicitColumnResolver parser, ColumnHandle col) {
+ private void resolvePartitionColumn(Matcher m, ImplicitColumnResolver parser,
+ ColumnHandle col) {
// The provided schema column must be of the correct type and mode.
ColumnMetadata colSchema = col.column();
if (colSchema.type() != MinorType.VARCHAR ||
colSchema.mode() != DataMode.OPTIONAL) {
throw UserException.validationError()
- .message("Provided column %s is marked as a parition column, but is of the wrong type",
+ .message("Provided column `%s` is marked as a parition column, but is of the wrong type",
colSchema.columnString())
.addContext("Expected type", MinorType.VARCHAR.name())
.addContext("Expected cardinality", DataMode.OPTIONAL.name())
@@ -275,22 +255,21 @@
// Partition column
int partitionIndex = Integer.parseInt(m.group(1));
- columns.add(new PartitionColumnMarker(partitionIndex));
- col.markImplicit();
+ markImplicit(col, new PartitionColumnMarker(partitionIndex));
// Remember the partition for later wildcard expansion
referencedPartitions.add(partitionIndex);
}
private void resolveImplicitColumn(ImplicitFileColumns defn,
- ImplicitColumnResolver parser, ColumnHandle col) {
+ ColumnHandle col, String colType) {
// The provided schema column must be of the correct type and mode.
ColumnMetadata colSchema = col.column();
if (colSchema.type() != MinorType.VARCHAR ||
colSchema.mode() == DataMode.REPEATED) {
throw UserException.validationError()
- .message("Provided column %s is marked as an implicit column '%s', but is of the wrong type",
+ .message("Provided column `%s` is marked as implicit '%s', but is of the wrong type",
colSchema.columnString(), defn.propertyValue())
.addContext("Expected type", MinorType.VARCHAR.name())
.addContext("Expected cardinality", String.format("%s or %s",
@@ -298,12 +277,15 @@
.addContext(parser.errorContext)
.build(logger);
}
- columns.add(new ImplicitColumnMarker(defn));
- col.markImplicit();
+ markImplicit(col, new FileImplicitMarker(defn));
}
- private void resolveUnknownColumn(ImplicitColumnResolver parser,
- ColumnHandle col, String colType) {
+ private void markImplicit(ColumnHandle col, ImplicitColumnMarker marker) {
+ columns.add(marker);
+ col.markImplicit(marker);
+ }
+
+ private void resolveUnknownColumn(ColumnHandle col, String colType) {
throw UserException.validationError()
.message("Provided column %s references an undefined implicit column type '%s'",
col.column().columnString(), colType)
@@ -314,20 +296,20 @@
.build(logger);
}
- private void matchByName(ImplicitColumnResolver parser, ColumnHandle col) {
+ private void matchByName(ColumnHandle col) {
Matcher m = parser.partitionPattern.matcher(col.name());
if (m.matches()) {
- buildPartitionColumn(m, parser, col);
+ buildPartitionColumn(m, col);
return;
}
- ImplicitFileColumns defn = parser.colDefs.get(col.name());
+ ImplicitFileColumn defn = parser.colDefs.get(col.name());
if (defn != null) {
- buildImplicitColumn(defn, parser, col);
+ buildImplicitColumn(defn, col);
}
}
- private void buildPartitionColumn(Matcher m, ImplicitColumnResolver parser, ColumnHandle col) {
+ private void buildPartitionColumn(Matcher m, ColumnHandle col) {
// If the projected column is a map or array, then it shadows the
// partition column. Example: dir0.x, dir0[2].
@@ -340,17 +322,21 @@
// Partition column
int partitionIndex = Integer.parseInt(m.group(1));
- ColumnMetadata resolved = MetadataUtils.newScalar(col.name(), PARTITION_COL_TYPE);
- SchemaUtils.markAsPartition(resolved, partitionIndex);
- columns.add(new PartitionColumnMarker(partitionIndex));
- scanSchema.resolveImplicit(col, resolved);
+ resolve(col,
+ MetadataUtils.newScalar(col.name(), PARTITION_COL_TYPE),
+ new PartitionColumnMarker(partitionIndex));
// Remember the partition for later wildcard expansion
referencedPartitions.add(partitionIndex);
}
- private void buildImplicitColumn(ImplicitFileColumns defn,
- ImplicitColumnResolver parser, ColumnHandle col) {
+ private void resolve(ColumnHandle col, ColumnMetadata resolved, ImplicitColumnMarker marker) {
+ columns.add(marker);
+ scanSchema.resolveImplicit(col, resolved, marker);
+ }
+
+ private void buildImplicitColumn(ImplicitFileColumn defn,
+ ColumnHandle col) {
// If the projected column is a map or array, then it shadows the
// metadata column. Example: filename.x, filename[2].
@@ -358,31 +344,58 @@
if (!projCol.isSimple()) {
logger.warn("Projected column {} shadows implicit column {}",
projCol.projectString(), col.name());
+ } else if (defn instanceof ImplicitInternalFileColumns) {
+ resolveInternalColumn(col, (ImplicitInternalFileColumns) defn);
} else {
- ColumnMetadata resolved = MetadataUtils.newScalar(col.name(), IMPLICIT_COL_TYPE);
- SchemaUtils.markImplicit(resolved, defn.propertyValue());
- columns.add(new ImplicitColumnMarker(defn));
- scanSchema.resolveImplicit(col, resolved);
+ resolve(col,
+ MetadataUtils.newScalar(col.name(), IMPLICIT_COL_TYPE),
+ new FileImplicitMarker((ImplicitFileColumns) defn));
}
}
+
+ private void resolveInternalColumn(ColumnHandle col,
+ ImplicitInternalFileColumns defn) {
+
+ // Tests may not provide the DFS, real code must
+ if (defn == ImplicitInternalFileColumns.LAST_MODIFIED_TIME &&
+ parser.dfs == null) {
+ throw new IllegalStateException(
+ "Must provide a file system to use " + defn.name());
+ }
+
+ // Check if this is an implied metadata scan
+ if (defn == ImplicitInternalFileColumns.PROJECT_METADATA) {
+ isMetadataScan = true;
+ }
+
+ // TODO: Internal columns are VARCHAR for historical reasons.
+ // Better to use a type that fits the column purposes.
+ resolve(col,
+ MetadataUtils.newScalar(col.name(),
+ defn.isOptional() ? OPTIONAL_INTERNAL_COL_TYPE : IMPLICIT_COL_TYPE),
+ new InternalColumnMarker(defn));
+ }
}
public static final MajorType IMPLICIT_COL_TYPE = Types.required(MinorType.VARCHAR);
public static final MajorType PARTITION_COL_TYPE = Types.optional(MinorType.VARCHAR);
+ public static final MajorType OPTIONAL_INTERNAL_COL_TYPE = Types.optional(MinorType.VARCHAR);
private final int maxPartitionDepth;
private final boolean useLegacyWildcardExpansion;
private final String partitionDesignator;
private final Pattern partitionPattern;
private final Pattern partitionTypePattern;
- private final Map<String, ImplicitFileColumns> colDefs = CaseInsensitiveMap.newHashMap();
- private final Map<String, ImplicitFileColumns> typeDefs = CaseInsensitiveMap.newHashMap();
+ private final Map<String, ImplicitFileColumn> colDefs = CaseInsensitiveMap.newHashMap();
+ private final Map<String, ImplicitFileColumn> typeDefs = CaseInsensitiveMap.newHashMap();
private final CustomErrorContext errorContext;
+ private final DrillFileSystem dfs;
public ImplicitColumnResolver(ImplicitColumnOptions options, CustomErrorContext errorContext) {
this.errorContext = errorContext;
this.maxPartitionDepth = options.maxPartitionDepth;
this.useLegacyWildcardExpansion = options.useLegacyWildcardExpansion;
+ this.dfs = options.dfs;
this.partitionDesignator = options.optionSet.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
this.partitionPattern = Pattern.compile(partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE);
if (partitionDesignator.equals(ColumnMetadata.IMPLICIT_PARTITION_PREFIX)) {
@@ -391,12 +404,22 @@
this.partitionTypePattern = Pattern.compile(ColumnMetadata.IMPLICIT_PARTITION_PREFIX + "(\\d+)",
Pattern.CASE_INSENSITIVE);
}
+
+ // File implicit columns: can be defined in the provided schema
for (ImplicitFileColumns defn : ImplicitFileColumns.values()) {
String colName = options.optionSet.getString(defn.optionName());
if (!Strings.isNullOrEmpty(colName)) {
this.colDefs.put(colName, defn);
}
- typeDefs.put(defn.propertyValue(), defn);
+ this.typeDefs.put(defn.propertyValue(), defn);
+ }
+
+ // Internal implicit cols: cannot be defined in the provided schema
+ for (ImplicitInternalFileColumns defn : ImplicitInternalFileColumns.values()) {
+ String colName = options.optionSet.getString(defn.optionName());
+ if (!Strings.isNullOrEmpty(colName)) {
+ this.colDefs.put(colName, defn);
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java
index d6e7ef6..ee8e6b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java
@@ -19,15 +19,15 @@
import java.util.List;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ImplicitColumnOptions;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ParseResult;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder.RepeatedBatchBuilder;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ColumnMarker;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ImplicitColumnOptions;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ParseResult;
import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
+import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.hadoop.fs.Path;
/**
@@ -41,26 +41,37 @@
*/
public class ImplicitFileColumnsHandler {
+ private final DrillFileSystem dfs;
private final ImplicitColumnResolver parser;
private final ResultVectorCache vectorCache;
private final Path rootDir;
private final ParseResult parseResult;
+ private final boolean isCompressible;
- public ImplicitFileColumnsHandler(OperatorContext context, FileScanLifecycleBuilder scanOptions,
+ public ImplicitFileColumnsHandler(DrillFileSystem dfs, OptionSet options,
+ FileScanLifecycleBuilder scanOptions,
ResultVectorCache vectorCache, ScanSchemaTracker schemaTracker) {
- ImplicitColumnOptions options = new ImplicitColumnOptions()
- .optionSet(context.getFragmentContext().getOptions())
+ ImplicitColumnOptions implicitOptions = new ImplicitColumnOptions()
+ .optionSet(options)
+ .dfs(dfs)
.maxPartitionDepth(scanOptions.maxPartitionDepth())
.useLegacyWildcardExpansion(scanOptions.useLegacyWildcardExpansion());
+ this.dfs = dfs;
this.rootDir = scanOptions.rootDir();
- this.parser = new ImplicitColumnResolver(options, scanOptions.errorContext());
+ this.parser = new ImplicitColumnResolver(implicitOptions, scanOptions.errorContext());
this.vectorCache = vectorCache;
this.parseResult = parser.parse(schemaTracker);
+ this.isCompressible = scanOptions.isCompressible();
}
- public StaticBatchBuilder forFile(Path filePath) {
- FileDescrip fileInfo = new FileDescrip(filePath, rootDir);
- List<ColumnMarker> columns = parseResult.columns();
+ public FileDescrip makeDescrip(FileWork fileWork) {
+ FileDescrip descrip = new FileDescrip(dfs, fileWork, rootDir);
+ descrip.setCompressible(isCompressible);
+ return descrip;
+ }
+
+ public StaticBatchBuilder forFile(FileDescrip fileInfo) {
+ List<ImplicitColumnMarker> columns = parseResult.columns();
if (columns.isEmpty()) {
return null;
}
@@ -70,4 +81,6 @@
}
return new RepeatedBatchBuilder(vectorCache, parseResult.schema(), values);
}
+
+ public boolean isMetadataScan() { return parseResult.isMetadataScan(); }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
index e758fc3..f322f51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
@@ -34,70 +34,68 @@
import org.apache.drill.exec.physical.resultSet.impl.ResultSetOptionBuilder;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages the schema and batch construction for a managed reader.
- * Allows the reader itself to be as simple as possible. This class
- * implements the basic {@link RowBatchReader} protocol based on
- * three methods, and converts it to the two-method protocol of
- * the managed reader. The {@code open()} call of the
+ * Manages the schema and batch construction for a managed reader. Allows the
+ * reader itself to be as simple as possible. This class implements the basic
+ * {@link RowBatchReader} protocol based on three methods, and converts it to
+ * the two-method protocol of the managed reader. The {@code open()} call of the
* {@code RowBatchReader} is combined with the constructor of the
- * {@link ManagedReader}, enforcing the rule that the managed reader
- * is created just-in-time when it is to be used, which avoids
- * accidentally holding resources for the life of the scan.
+ * {@link ManagedReader}, enforcing the rule that the managed reader is created
+ * just-in-time when it is to be used, which avoids accidentally holding
+ * resources for the life of the scan. Also allows most of the reader's fields
+ * to be {@code final}.
* <p>
- * Coordinates the components that wrap a reader to create the final
- * output batch:
+ * Coordinates the components that wrap a reader to create the final output
+ * batch:
* <ul>
- * <li>The actual reader which load (possibly a subset of) the
- * columns requested from the input source.</li>
- * <li>Implicit columns manager instance which populates implicit
- * file columns, partition columns, and Drill's internal metadata
- * columns.</li>
- * <li>The missing columns handler which "makes up" values for projected
- * columns not read by the reader.</li>
- * <li>Batch assembler, which combines the three sources of vectors
- * to create the output batch with the schema specified by the
- * schema tracker.</li>
+ * <li>The actual reader which loads (possibly a subset of) the columns requested
+ * from the input source.</li>
+ * <li>Implicit columns manager instance which populates implicit file columns,
+ * partition columns, and Drill's internal implicit columns.</li>
+ * <li>The missing columns handler which "makes up" values for projected columns
+ * not read by the reader.</li>
+ * <li>Batch assembler, which combines the three sources of vectors to create
+ * the output batch with the schema specified by the schema tracker.</li>
* </ul>
* <p>
* This class coordinates the reader-visible aspects of the scan:
* <ul>
- * <li>The {@link SchemaNegotiator} (or subclass) which provides
- * schema-related input to the reader and which creates the reader's
- * {@link ResultSetLoader}, among other tasks. The schema negotiator
- * is specific to each kind of scan and is thus created via the
- * {@link ScanLifecycleBuilder}.</li>
- * <li>The reader, which is designed to be as simple as possible,
- * with all generic overhead tasks handled by this "shim" between
- * the scan operator and the actual reader implementation.</li>
+ * <li>The {@link SchemaNegotiator} (or subclass) which provides schema-related
+ * input to the reader and which creates the reader's {@link ResultSetLoader},
+ * among other tasks. The schema negotiator is specific to each kind of scan and
+ * is thus created via the {@link ScanLifecycleBuilder}.</li>
+ * <li>The reader, which is designed to be as simple as possible, with all
+ * generic overhead tasks handled by this "shim" between the scan operator and
+ * the actual reader implementation.</li>
* </ul>
* <p>
- * The reader is schema-driven. See {@link ScanSchemaTracker} for
- * an overview.
+ * The reader is schema-driven. See {@link ScanSchemaTracker} for an overview.
* <ul>
- * <li>The reader is given a <i>reader input schema</i>, via the
- * schema negotiator, which specifies the desired output schema.
- * The schema can be fully dynamic (a wildcard), fully defined (a
- * prior reader already chose column types), or a hybrid.</li>
- * <li>The reader can load a subset of columns. Those that are
- * left out become "missing columns" to be filled in by this
- * class.</li>
- * <li>The <i>reader output schema</i> along with implicit and missing
- * columns, together define the scan's output schema.</li>
+ * <li>The reader is given a <i>reader input schema</i>, via the schema
+ * negotiator, which specifies the desired output schema. The schema can be
+ * fully dynamic (a wildcard), fully defined (a prior reader already chose
+ * column types), or a hybrid.</li>
+ * <li>The reader can load a subset of columns. Those that are left out become
+ * "missing columns" to be filled in by this class.</li>
+ * <li>The <i>reader output schema</i> along with implicit and missing columns,
+ * together define the scan's output schema.</li>
* </ul>
* <p>
- * The framework handles the projection task so the
- * reader does not have to worry about it. Reading an unwanted column
- * is low cost: the result set loader will have provided a "dummy" column
- * writer that simply discards the value. This is just as fast as having the
- * reader use if-statements or a table to determine which columns to save.
+ * The framework handles the projection task so the reader does not have to
+ * worry about it. Reading an unwanted column is low cost: the result set loader
+ * will have provided a "dummy" column writer that simply discards the value.
+ * This is just as fast as having the reader use if-statements or a table to
+ * determine which columns to save.
*/
public class ReaderLifecycle implements RowBatchReader {
private static final Logger logger = LoggerFactory.getLogger(ReaderLifecycle.class);
+ private enum State { START, DATA, FINAL, EOF }
+
private final ScanLifecycle scanLifecycle;
protected final TupleMetadata readerInputSchema;
private ManagedReader reader;
@@ -107,12 +105,7 @@
private StaticBatchBuilder implicitColumnsLoader;
private StaticBatchBuilder missingColumnsHandler;
private OutputBatchBuilder outputBuilder;
-
- /**
- * True once the reader reports EOF. This shim may keep going for another
- * batch to handle any look-ahead row on the last batch.
- */
- private boolean eof;
+ private State state = State.START;
public ReaderLifecycle(ScanLifecycle scanLifecycle) {
this.scanLifecycle = scanLifecycle;
@@ -137,6 +130,8 @@
return reader.getClass().getSimpleName();
}
+ public ResultSetLoader tableLoader() { return tableLoader; }
+
@Override
public boolean open() {
try {
@@ -168,6 +163,7 @@
}
public ResultSetLoader buildLoader() {
+ Preconditions.checkState(state == State.START);
ResultSetOptionBuilder options = new ResultSetOptionBuilder()
.rowCountLimit(Math.min(schemaNegotiator.batchSize, scanOptions().scanBatchRecordLimit()))
.vectorCache(scanLifecycle.vectorCache())
@@ -176,12 +172,12 @@
.projectionFilter(schemaTracker().projectionFilter(errorContext()))
.readerSchema(schemaNegotiator.readerSchema);
- // Resolve the scan scame if possible.
+ // Resolve the scan schema if possible.
applyEarlySchema();
// Create the table loader
tableLoader = new ResultSetLoaderImpl(scanLifecycle.allocator(), options.build());
- implicitColumnsLoader = schemaNegotiator.implicitColumnsLoader();
+ state = State.DATA;
return tableLoader;
}
@@ -205,12 +201,12 @@
@Override
public boolean defineSchema() {
- if (!schemaNegotiator.isSchemaComplete()) {
- return false;
+ boolean hasSchema = schemaNegotiator.isSchemaComplete() && schemaTracker().isResolved();
+ if (hasSchema) {
+ tableLoader.startBatch();
+ endBatch();
}
- tableLoader.startBatch();
- endBatch();
- return true;
+ return hasSchema;
}
@Override
@@ -218,7 +214,7 @@
// The reader may report EOF, but the result set loader might
// have a lookahead row.
- if (isEof()) {
+ if (state == State.EOF) {
return false;
}
@@ -230,9 +226,11 @@
// a new batch just to learn about EOF. Don't read if the reader
// already reported EOF. In that case, we're just processing any last
// lookahead row in the result set loader.
- if (!eof) {
+ if (state == State.DATA) {
try {
- eof = !reader.next();
+ if (!reader.next()) {
+ state = State.FINAL;
+ }
} catch (UserException e) {
throw e;
} catch (Exception e) {
@@ -252,11 +250,7 @@
// Return EOF (false) only when the reader reports EOF
// and the result set loader has drained its rows from either
// this batch or lookahead rows.
- return !isEof();
- }
-
- public boolean isEof() {
- return eof && !tableLoader.hasRows();
+ return state != State.EOF;
}
/**
@@ -265,24 +259,21 @@
* table row count. Then, merge the sources.
*/
private void endBatch() {
+
+ // Let the schema negotiator finish up the batch. Needed for metadata
+ // scans on files.
+ // TODO: Modify the metadata system to handle non-file scans, then
+ // generalize the implicit columns parser, identify a new field to
+ // replace/augment fqn, and handle empty scans here.
+ schemaNegotiator.onEndBatch();
+
+ // Get the output batch
VectorContainer readerOutput = tableLoader.harvest();
- // Corner case: Reader produced no rows and defined no schema.
- // We assume this is a null file and skip it as if we got an
- // EOF from the constructor. There may be some actual case in
- // which a file is known to have no records and no columns,
- // which is an empty file. At present, there is no way to differentiate
- // an intentional empty file from a null file.
- if (readerOutput.getRecordCount() == 0 && tableLoader.schemaVersion() == 0) {
- readerOutput.clear();
- return;
- }
-
- // If not the first batch, and there are no rows, discard the batch
- // as it adds no new information.
- if (readerOutput.getRecordCount() == 0 && tableLoader.batchCount() > 1) {
+ if (readerOutput.getRecordCount() == 0 && !returnEmptyBatch(readerOutput)) {
readerOutput.clear();
outputBuilder = null;
+ state = State.EOF;
return;
}
@@ -292,6 +283,40 @@
reviseOutputProjection(tableLoader.outputSchema());
}
buildOutputBatch(readerOutput);
+ scanLifecycle.tallyBatch();
+ }
+
+ /**
+ * The reader returned no data. Determine if this batch should be
+ * returned (return {@code true}), or if the empty batch should
+ * be returned to convey schema information. (return {@code false}).
+ */
+ private boolean returnEmptyBatch(VectorContainer readerOutput) {
+
+ // If the batch is not the first, then it conveys no new info.
+ if (scanLifecycle.batchCount() > 0) {
+ return false;
+ }
+
+ // Corner case: Reader produced no rows and defined no schema.
+ // There are three sub-cases. In the first, the reader is "late-schema",
+ // it discovers schema as it reads, and did not read anything. In
+ // this case, we assume this is a null file and skip it as if we got an
+ // EOF from the constructor.
+ //
+ // The second sub-case is as above, but a schema was provided. In this
+ // case, we can produce an empty result set to convey that schema.
+ //
+ // A third, possible, but very obscure case occurs in which a file
+ // is known to have no records and no columns. At present, there is
+ // no way to differentiate an intentional empty file from a null file.
+ if (tableLoader.schemaVersion() == 0) {
+ return schemaTracker().isResolved();
+ }
+
+ // Otherwise, we did define a schema on the first batch of the scan
+ // so we need to return it.
+ return true;
}
private void reviseOutputProjection(TupleMetadata readerOutputSchema) {
@@ -314,6 +339,12 @@
private void buildOutputBatch(VectorContainer readerContainer) {
+ // Create the implicit columns loader loader after the first
+ // batch so we can report if the file is empty.
+ if (tableLoader.batchCount() == 1) {
+ implicitColumnsLoader = schemaNegotiator.implicitColumnsLoader();
+ }
+
// Get the batch results in a container.
int rowCount = readerContainer.getRecordCount();
if (implicitColumnsLoader != null) {
@@ -353,7 +384,7 @@
@Override
public int schemaVersion() {
- return tableLoader.schemaVersion();
+ return schemaTracker().schemaVersion();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
index 039810f..3261a05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
@@ -135,6 +135,7 @@
private final ScanLifecycleBuilder options;
private final ScanSchemaTracker schemaTracker;
private final ReaderFactory<?> readerFactory;
+ private int batchCount;
/**
* Cache used to preserve the same vectors from one output batch to the
@@ -146,17 +147,20 @@
*/
private final ResultVectorCacheImpl vectorCache;
- public ScanLifecycle(OperatorContext context, ScanLifecycleBuilder options) {
+ public ScanLifecycle(OperatorContext context, ScanLifecycleBuilder builder) {
this.context = context;
- this.options = options;
+ this.options = builder;
this.schemaTracker = new ScanSchemaConfigBuilder()
- .projection(options.projection())
- .definedSchema(options.definedSchema())
- .providedSchema(options.providedSchema())
- .allowSchemaChange(options.allowSchemaChange())
+ .projection(builder.projection())
+ .definedSchema(builder.definedSchema())
+ .providedSchema(builder.providedSchema())
+ .allowSchemaChange(builder.allowSchemaChange())
.build();
+ if (builder.schemaValidator() != null) {
+ builder.schemaValidator().validate(schemaTracker);
+ }
this.vectorCache = new ResultVectorCacheImpl(allocator(), false);
- this.readerFactory = options.readerFactory();
+ this.readerFactory = builder.readerFactory();
}
public OperatorContext context() { return context; }
@@ -167,6 +171,8 @@
public boolean hasOutputSchema() { return schemaTracker.isResolved(); }
public CustomErrorContext errorContext() { return options.errorContext(); }
public BufferAllocator allocator() { return context.getAllocator(); }
+ public void tallyBatch() { batchCount++; }
+ public int batchCount() { return batchCount; }
public RowBatchReader nextReader() {
if (readerFactory.hasNext()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
index 20b2a02..7b94cc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
@@ -23,6 +23,7 @@
import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -73,6 +74,11 @@
}
@Override
+ public ProjectedColumn projectionFor(String colName) {
+ return readerLifecycle.scanLifecycle().schemaTracker().columnProjection(colName);
+ }
+
+ @Override
public TupleMetadata providedSchema() {
return readerLifecycle.scanOptions().providedSchema();
}
@@ -92,6 +98,7 @@
return baseErrorContext;
}
+ @Override
public CustomErrorContext errorContext() {
return readerErrorContext == null ? baseErrorContext : readerErrorContext;
}
@@ -151,4 +158,6 @@
public ManagedReader newReader(ReaderFactory<?> readerFactory) throws EarlyEofException {
return ((ReaderFactory<SchemaNegotiator>) readerFactory).next(this);
}
+
+ protected void onEndBatch() { }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java
index 1944916..064f4b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java
@@ -74,7 +74,9 @@
for (int i = 0; i < rowCount; i++) {
writer.start();
for (int j = 0; j < n; j++) {
- writers[j].setValue(values[j]);
+ if (values[j] != null) {
+ writers[j].setValue(values[j]);
+ }
}
writer.save();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java
index 20de4f2..7646477 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java
@@ -41,6 +41,13 @@
this.errorContext = errorContext;
}
+ /**
+ * Validate a projection list against a defined-schema tuple. Recursively walks
+ * the tree of maps to validate all nested tuples.
+ *
+ * @param projection the parsed projection list
+ * @param schema the defined schema to validate against
+ */
protected static void validateProjection(TupleMetadata projection, TupleMetadata schema) {
if (projection == null || SchemaUtils.isProjectAll(projection)) {
return;
@@ -77,8 +84,25 @@
@Override
public int schemaVersion() { return schema.version(); }
+ /**
+ * Determine if the schema is resolved. It is resolved if the
+ * schema itself is resolved. Since an empty schema is resolved, for the
+ * {@code SELECT *} case, we require at least one column, which means
+ * that something (provided schema, early reader schema) has provided
+ * us with a schema. Once resolved, a schema can never become
+ * unresolved: readers are not allowed to add dynamic columns.
+ */
protected void checkResolved() {
- isResolved = schema.isResolved();
+ if (isResolved) {
+ return;
+ }
+ switch (projectionType()) {
+ case ALL:
+ isResolved = !schema.isEmpty() && schema.isResolved();
+ break;
+ default:
+ isResolved = schema.isResolved();
+ }
}
@Override
@@ -102,9 +126,8 @@
private TupleMetadata implicitColumns() {
TupleMetadata implicitCols = new TupleSchema();
for (ColumnHandle handle : schema.columns()) {
- ColumnMetadata col = handle.column();
- if (SchemaUtils.isImplicit(col)) {
- implicitCols.addColumn(col);
+ if (handle.isImplicit()) {
+ implicitCols.addColumn(handle.column());
}
}
return implicitCols;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleMetadata.java
deleted file mode 100644
index d5f46d2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleMetadata.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan.v3.schema;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-/**
- * A mutable form of a tuple schema. Allows insertions (at the wildcard position),
- * and replacing columns (as the schema becomes resolved). Tracks implicit columns
- * (those not filled in by the reader).
- * <p>
- * Does not implement the {@code TupleMetadata} interface because that interface
- * has far more functionality than is needed here, and assumes that column order
- * remains fixed (and hence columns can be addressed by position) which is not
- * true for this class.
- * <p>
- * This class represents the top-level tuple (the row.) Maps are also dynamic,
- * but provide a subset of resolution options:
- * map fields cannot be implicit. They can, however, be defined,
- * provided, discovered or missing. Map columns can start unresolved
- * if the map comes from projection. A map itself can be resolved,
- * but its members may be unresolved. New map members may only be added at the
- * end (there is no equivalent of a wildcard position.)
- */
-public class MutableTupleMetadata {
-
- /**
- * Holder for a column to allow inserting and replacing columns within
- * the top-level project list. Changes to the column within the holder
- * must go through the tuple itself so we can track schema versions.
- * <p>
- * Tracks the resolution status of each individual column as
- * described for {@link ScanSchemaTracker}. Models a column throughout the
- * projection lifecycle. Columns evolve from unresolved to resolved at
- * different times. Columns are either implicit (defined by the framework)
- * or normal (defined by the reader). Columns can be defined by the
- * planner (via a defined schema), partially defined (via a provided
- * schema), or discovered by the reader. Regardless of the path
- * to definition, by the time the first batch is delivered downstream,
- * each column has an output schema which describes the data.
- */
- public static class ColumnHandle {
- private ColumnMetadata col;
- private boolean isImplicit;
-
- public ColumnHandle(ColumnMetadata col) {
- this.col = col;
- this.isImplicit = SchemaUtils.isImplicit(col);
- }
-
- public String name() {
- return col.name();
- }
-
- private void replace(ColumnMetadata col) {
- this.col = col;
- }
-
- private void resolve(ColumnMetadata col) {
- SchemaUtils.mergeColProperties(this.col, col);
- this.col = col;
- }
-
- private void resolveImplicit(ColumnMetadata col) {
- SchemaUtils.mergeColProperties(this.col, col);
- this.col = col;
- markImplicit();
- }
-
- public void markImplicit() {
- Preconditions.checkState(SchemaUtils.isImplicit(col));
- isImplicit = true;
- }
-
- public ColumnMetadata column() { return col; }
- public boolean isImplicit() { return isImplicit; }
-
- @Override
- public String toString() {
- return col.toString();
- }
- }
-
- protected final List<MutableTupleMetadata.ColumnHandle> columns = new ArrayList<>();
- protected final Map<String, MutableTupleMetadata.ColumnHandle> nameIndex =
- CaseInsensitiveMap.newHashMap();
- private ProjectionType projType;
- private int insertPoint = -1;
- private int version;
-
- public void setProjectionType(ScanSchemaTracker.ProjectionType type) {
- this.projType = type;
- }
-
- public void setInsertPoint(int insertPoint) {
- Preconditions.checkArgument(insertPoint == -1 ||
- insertPoint >= 0 && insertPoint <= size());
- this.insertPoint = insertPoint;
- }
-
- public ScanSchemaTracker.ProjectionType projectionType() { return projType; }
- public int size() { return columns.size(); }
- public int version() { return version; }
-
- /**
- * Provide the list of partially-resolved columns. Primarily for
- * the implicit column parser.
- */
- public List<MutableTupleMetadata.ColumnHandle> columns() { return columns; }
-
- public MutableTupleMetadata.ColumnHandle find(String colName) {
- return nameIndex.get(colName);
- }
-
- public void copyFrom(TupleMetadata from) {
- if (from.isEmpty()) {
- return;
- }
- for (ColumnMetadata projCol : from) {
- add(projCol.copy());
- }
- version++;
- }
-
- public void add(ColumnMetadata col) {
- MutableTupleMetadata.ColumnHandle holder = new ColumnHandle(col);
- columns.add(holder);
- addIndex(holder);
- version++;
- }
-
- public void addIndex(MutableTupleMetadata.ColumnHandle holder) {
- if (nameIndex.put(holder.column().name(), holder) != null) {
- throw new IllegalArgumentException("Duplicate scan projection column: " + holder.name());
- }
- }
-
- public void insert(int posn, ColumnMetadata col) {
- MutableTupleMetadata.ColumnHandle holder = new ColumnHandle(col);
- columns.add(posn, holder);
- addIndex(holder);
- version++;
- }
-
- public void insert(ColumnMetadata col) {
- insert(insertPoint++, col);
- }
-
- public boolean isResolved() {
- for (MutableTupleMetadata.ColumnHandle handle : columns) {
- if (!isColumnResolved(handle.column())) {
- return false;
- }
- }
- return true;
- }
-
- private boolean isColumnResolved(ColumnMetadata col) {
- return !col.isDynamic() && (!col.isMap() || isMapResolved(col.tupleSchema()));
- }
-
- private boolean isMapResolved(TupleMetadata mapSchema) {
- for (ColumnMetadata col : mapSchema) {
- if (col.isDynamic()) {
- return false;
- }
- if (col.isMap() && !isMapResolved(col.tupleSchema())) {
- return false;
- }
- }
- return true;
- }
-
- public TupleMetadata toSchema() {
- TupleMetadata schema = new TupleSchema();
- for (MutableTupleMetadata.ColumnHandle col : columns) {
- schema.addColumn(col.column());
- }
- return schema;
- }
-
- public void resolveImplicit(ColumnHandle col, ColumnMetadata resolved) {
- col.resolveImplicit(resolved);
- version++;
- }
-
- public void replace(ColumnHandle col, ColumnMetadata resolved) {
- col.replace(resolved);
- version++;
- }
-
- public void resolve(ColumnHandle col, ColumnMetadata resolved) {
- col.resolve(resolved);
- version++;
- }
-}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
index ba3483f..89883b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
@@ -20,8 +20,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -65,11 +67,10 @@
*/
public static class ColumnHandle {
private ColumnMetadata col;
- private boolean isImplicit;
+ private ImplicitColumnMarker marker;
public ColumnHandle(ColumnMetadata col) {
this.col = col;
- this.isImplicit = SchemaUtils.isImplicit(col);
}
public String name() {
@@ -85,19 +86,18 @@
this.col = col;
}
- private void resolveImplicit(ColumnMetadata col) {
+ private void resolveImplicit(ColumnMetadata col, ImplicitColumnMarker marker) {
SchemaUtils.mergeColProperties(this.col, col);
this.col = col;
- markImplicit();
+ markImplicit(marker);
}
- public void markImplicit() {
- Preconditions.checkState(SchemaUtils.isImplicit(col));
- isImplicit = true;
+ public void markImplicit(ImplicitColumnMarker marker) {
+ this.marker = marker;
}
public ColumnMetadata column() { return col; }
- public boolean isImplicit() { return isImplicit; }
+ public boolean isImplicit() { return marker != null; }
@Override
public String toString() {
@@ -105,8 +105,8 @@
}
}
- protected final List<MutableTupleSchema.ColumnHandle> columns = new ArrayList<>();
- protected final Map<String, MutableTupleSchema.ColumnHandle> nameIndex =
+ protected final List<ColumnHandle> columns = new ArrayList<>();
+ protected final Map<String, ColumnHandle> nameIndex =
CaseInsensitiveMap.newHashMap();
private ProjectionType projType;
private int insertPoint = -1;
@@ -114,6 +114,10 @@
public void setProjectionType(ScanSchemaTracker.ProjectionType type) {
this.projType = type;
+
+ // For project none, an empty schema is valid, so
+ // force a bump in schema version.
+ version++;
}
public void setInsertPoint(int insertPoint) {
@@ -130,9 +134,9 @@
* Provide the list of partially-resolved columns. Primarily for
* the implicit column parser.
*/
- public List<MutableTupleSchema.ColumnHandle> columns() { return columns; }
+ public List<ColumnHandle> columns() { return columns; }
- public MutableTupleSchema.ColumnHandle find(String colName) {
+ public ColumnHandle find(String colName) {
return nameIndex.get(colName);
}
@@ -147,31 +151,52 @@
}
public void add(ColumnMetadata col) {
- MutableTupleSchema.ColumnHandle holder = new ColumnHandle(col);
+ ColumnHandle holder = new ColumnHandle(col);
columns.add(holder);
addIndex(holder);
version++;
}
- public void addIndex(MutableTupleSchema.ColumnHandle holder) {
+ public void addIndex(ColumnHandle holder) {
if (nameIndex.put(holder.column().name(), holder) != null) {
throw new IllegalArgumentException("Duplicate scan projection column: " + holder.name());
}
}
- public void insert(int posn, ColumnMetadata col) {
- MutableTupleSchema.ColumnHandle holder = new ColumnHandle(col);
+ public ColumnHandle insert(int posn, ColumnMetadata col) {
+ ColumnHandle holder = new ColumnHandle(col);
columns.add(posn, holder);
addIndex(holder);
version++;
+ return holder;
}
- public void insert(ColumnMetadata col) {
- insert(insertPoint++, col);
+ public ColumnHandle insert(ColumnMetadata col) {
+ return insert(insertPoint++, col);
+ }
+
+ /**
+ * Move a column from its current position (which must be past the insert point)
+ * to the insert point. An index entry already exists. Special operation done
+ * when matching a provided schema to a projection list that includes a wildcard
+ * and explicitly projected columns. Works around unfortunate behavior in the
+ * planner.
+ */
+ public void moveIfExplicit(String colName) {
+ ColumnHandle holder = find(colName);
+ Objects.requireNonNull(holder);
+ int posn = columns.indexOf(holder);
+ if (posn == insertPoint) {
+ insertPoint++;
+ } else if (posn > insertPoint) {
+ columns.remove(posn);
+ columns.add(insertPoint++, holder);
+ version++;
+ }
}
public boolean isResolved() {
- for (MutableTupleSchema.ColumnHandle handle : columns) {
+ for (ColumnHandle handle : columns) {
if (!isColumnResolved(handle.column())) {
return false;
}
@@ -197,14 +222,14 @@
public TupleMetadata toSchema() {
TupleMetadata schema = new TupleSchema();
- for (MutableTupleSchema.ColumnHandle col : columns) {
+ for (ColumnHandle col : columns) {
schema.addColumn(col.column());
}
return schema;
}
- public void resolveImplicit(ColumnHandle col, ColumnMetadata resolved) {
- col.resolveImplicit(resolved);
+ public void resolveImplicit(ColumnHandle col, ColumnMetadata resolved, ImplicitColumnMarker marker) {
+ col.resolveImplicit(resolved, marker);
version++;
}
@@ -217,4 +242,6 @@
col.resolve(resolved);
version++;
}
+
+ public boolean isEmpty() { return columns.isEmpty(); }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java
index 3e1373e..a11b675 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java
@@ -18,13 +18,13 @@
package org.apache.drill.exec.physical.impl.scan.v3.schema;
import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
import org.apache.drill.exec.physical.impl.scan.v3.schema.DynamicSchemaFilter.RowSchemaFilter;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaResolver.SchemaType;
import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
* Schema tracker for the "normal" case in which schema starts from a simple
@@ -37,6 +37,7 @@
private final TupleMetadata projection;
private final boolean allowSchemaChange;
private int implicitInsertPoint;
+ private int readerSchemaCount;
private boolean allowMapAdditions = true;
public ProjectionSchemaTracker(TupleMetadata definedSchema,
@@ -47,16 +48,16 @@
this.allowSchemaChange = false;
schema.copyFrom(definedSchema);
validateProjection(parseResult.dynamicSchema, definedSchema);
- checkResolved();
- ScanSchemaTracker.ProjectionType projType;
+ ProjectionType projType;
if (schema.size() == 0) {
- projType = ScanSchemaTracker.ProjectionType.NONE;
+ projType = ProjectionType.NONE;
} else {
- projType = ScanSchemaTracker.ProjectionType.SOME;
+ projType = ProjectionType.SOME;
}
schema.setProjectionType(projType);
this.implicitInsertPoint = -1;
+ checkResolved();
}
public ProjectionSchemaTracker(ProjectionParseResult parseResult, boolean allowSchemaChange,
@@ -67,15 +68,15 @@
this.schema.copyFrom(projection);
// Work out the projection type: wildcard, empty, or explicit.
- ScanSchemaTracker.ProjectionType projType;
+ ProjectionType projType;
if (parseResult.isProjectAll()) {
- projType = ScanSchemaTracker.ProjectionType.ALL;
+ projType = ProjectionType.ALL;
} else if (projection.isEmpty()) {
- projType = ScanSchemaTracker.ProjectionType.NONE;
+ projType = ProjectionType.NONE;
this.isResolved = true;
this.allowMapAdditions = false;
} else {
- projType = ScanSchemaTracker.ProjectionType.SOME;
+ projType = ProjectionType.SOME;
}
this.schema.setProjectionType(projType);
@@ -84,6 +85,11 @@
this.implicitInsertPoint = parseResult.wildcardPosn;
}
+ @Override
+ public ProjectedColumn columnProjection(String colName) {
+ return (ProjectedColumn) projection.metadata(colName);
+ }
+
public void applyProvidedSchema(TupleMetadata providedSchema) {
boolean isStrict = SchemaUtils.isStrict(providedSchema);
new ScanSchemaResolver(schema,
@@ -132,6 +138,8 @@
public ProjectionFilter projectionFilter(CustomErrorContext errorContext) {
switch (projectionType()) {
case ALL:
+
+ // Empty schema implies we've only seen the wildcard this far.
if (schema.size() == 0) {
return ProjectionFilter.PROJECT_ALL;
}
@@ -146,7 +154,17 @@
@Override
public void applyReaderSchema(TupleMetadata readerOutputSchema,
CustomErrorContext errorContext) {
- new ScanSchemaResolver(schema, SchemaType.READER_SCHEMA, allowMapAdditions, errorContext)
+ SchemaType schemaType;
+
+ // The first reader can reposition columns projected with a wildcard,
+ // other readers cannot as we want to preserve column order after the
+ // first batch.
+ if (readerSchemaCount == 0 && allowSchemaChange) {
+ schemaType = SchemaType.FIRST_READER_SCHEMA;
+ } else {
+ schemaType = SchemaType.READER_SCHEMA;
+ }
+ new ScanSchemaResolver(schema, schemaType, allowMapAdditions, errorContext)
.applySchema(readerOutputSchema);
if (!allowSchemaChange) {
allowMapAdditions = false;
@@ -155,11 +173,11 @@
}
}
checkResolved();
+ readerSchemaCount++;
}
@Override
- public void expandImplicitCol(ColumnMetadata resolved) {
- Preconditions.checkArgument(SchemaUtils.isImplicit(resolved));
- schema.insert(implicitInsertPoint++, resolved);
+ public void expandImplicitCol(ColumnMetadata resolved, ImplicitColumnMarker marker) {
+ schema.insert(implicitInsertPoint++, resolved).markImplicit(marker);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java
index 24511a1..9af575a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java
@@ -71,6 +71,8 @@
if (errorContext == null) {
errorContext = EmptyErrorContext.INSTANCE;
}
+
+ // Parse the projection list
ProjectionParseResult result;
if (projectionList == null) {
result = null;
@@ -78,21 +80,45 @@
result = ScanProjectionParser.parse(projectionList);
}
+ // If a strict schema is provided, then no schema changes are allowed.
if (providedSchema != null && SchemaUtils.isStrict(providedSchema)) {
allowSchemaChange = false;
}
+
+ // Figure out the schema tracker to use
if (definedSchema == null) {
+
+ // No defined schema: this is a projection-based tracker, possibly
+ // constrained by a provided schema.
ProjectionSchemaTracker tracker = new ProjectionSchemaTracker(result, allowSchemaChange, errorContext);
+
+ // Apply the provided schema. Doing so forces resolution of the projection
+ // list just appled above.
if (providedSchema != null) {
tracker.applyProvidedSchema(providedSchema);
}
return tracker;
} else {
+
+ // Defined schema case, which is supported only via tests at present;
+ // the planner can't yet produce a defined schema.
+
+ // A defined schema can include dynamic columns (those with no type.) If
+ // so, treat the dynamic schema as combination of a projection list and a
+ // provided schema.
if (!MetadataUtils.hasDynamicColumns(definedSchema)) {
SchemaBasedTracker tracker = new SchemaBasedTracker(definedSchema, errorContext);
- tracker.validateProjection(result.dynamicSchema);
+
+ // A projection list is not required. But, if provided, it must be consistent
+ // with the defined schema.
+ if (result != null) {
+ tracker.validateProjection(result.dynamicSchema);
+ }
return tracker;
} else {
+
+ // The defined schema has not dynamic columns: it is fully defined, just like
+ // in a "classic" DB. Use a schema-driven schema tracker.
return new ProjectionSchemaTracker(definedSchema, result, errorContext);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java
index 6d1ec40..34ceb9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java
@@ -82,13 +82,14 @@
* Indicates the source of the schema to be analyzed.
* Each schema type has subtly different rules. The
* schema type allows us to inject those differences inline
- * within the resolution process. Also, each schema caries
+ * within the resolution process. Also, each schema carries
* a tag used for error reporting.
*/
public enum SchemaType {
STRICT_PROVIDED_SCHEMA("Provided"),
LENIENT_PROVIDED_SCHEMA("Provided"),
EARLY_READER_SCHEMA("Reader"),
+ FIRST_READER_SCHEMA("Reader"),
READER_SCHEMA("Reader"),
MISSING_COLS("Missing columns");
@@ -105,41 +106,64 @@
private final MutableTupleSchema schema;
private final SchemaType mode;
- private final boolean isProjectAll;
private final boolean allowMapAdditions;
private final String source;
private final CustomErrorContext errorContext;
+ private final boolean allowColumnReorder;
public ScanSchemaResolver(MutableTupleSchema schema, SchemaType mode,
boolean allowMapAdditions,
CustomErrorContext errorContext) {
this.schema = schema;
- this.isProjectAll = schema.projectionType() == ProjectionType.ALL;
this.mode = mode;
this.errorContext = errorContext;
this.allowMapAdditions = allowMapAdditions;
this.source = mode.source();
+ switch (mode) {
+ case STRICT_PROVIDED_SCHEMA:
+ case LENIENT_PROVIDED_SCHEMA:
+ case EARLY_READER_SCHEMA:
+ case FIRST_READER_SCHEMA:
+
+ // Allow reordering columns with projection is of the form
+ // *, foo. Move bar to its place in the schema (foo, bar) rather
+ // than at the end, as with implicit columns.
+ this.allowColumnReorder = schema.projectionType() == ProjectionType.ALL;
+ break;
+ default:
+ this.allowColumnReorder = false;
+ }
}
public void applySchema(TupleMetadata sourceSchema) {
switch (schema.projectionType()) {
case ALL:
+ projectSchema(sourceSchema);
+ if (mode == SchemaType.STRICT_PROVIDED_SCHEMA) {
+ schema.setProjectionType(ScanSchemaTracker.ProjectionType.SOME);
+ }
+ break;
case SOME:
projectSchema(sourceSchema);
break;
default:
// Do nothing
}
- if (mode == SchemaType.STRICT_PROVIDED_SCHEMA && isProjectAll) {
- schema.setProjectionType(ScanSchemaTracker.ProjectionType.SOME);
- }
}
/**
- * A project list can contain implicit columns in
- * addition to the wildcard. The wildcard defines the
- * <i>insert point</i>: the point at which reader-defined
- * columns are inserted as found.
+ * A project list can contain implicit columns in addition to the wildcard.
+ * The wildcard defines the <i>insert point</i>: the point at which
+ * reader-defined columns are inserted as found. This version applies a
+ * provided schema to a projection. If we are given a query of the form
+ * {@code SELECT * FROM foo ORDER BY bar}, Drill will give us a projection
+ * list of the form {@code [`**`, `bar`]} and normal projection processing
+ * will project all provided columns, except {@code bar}, in place of the
+ * wildcard. Since this behavior differs from all other DBs, we apply special
+ * processing, we move the projection column into the next wildcard position
+ * as if Drill did not include the extra column projection. This is a hack,
+ * but one that helps with ease-of-use. We apply the same rule to the first
+ * reader schema for the same reason.
*/
private void projectSchema(TupleMetadata sourceSchema) {
for (ColumnMetadata colSchema : sourceSchema) {
@@ -148,6 +172,9 @@
insertColumn(colSchema);
} else {
mergeColumn(existing, colSchema);
+ if (allowColumnReorder) {
+ schema.moveIfExplicit(colSchema.name());
+ }
}
}
}
@@ -160,8 +187,9 @@
*/
private void insertColumn(ColumnMetadata col) {
switch (mode) {
+ case FIRST_READER_SCHEMA:
case READER_SCHEMA:
- if (!isProjectAll) {
+ if (schema.projectionType() != ProjectionType.ALL) {
throw new IllegalStateException(
"Reader should not have projected an unprojected column: " + col.name());
}
@@ -169,7 +197,7 @@
case EARLY_READER_SCHEMA:
case LENIENT_PROVIDED_SCHEMA:
case STRICT_PROVIDED_SCHEMA:
- if (!isProjectAll || SchemaUtils.isExcludedFromWildcard(col)) {
+ if (schema.projectionType() != ProjectionType.ALL || SchemaUtils.isExcludedFromWildcard(col)) {
return;
}
break;
@@ -200,16 +228,9 @@
switch (mode) {
case LENIENT_PROVIDED_SCHEMA:
case STRICT_PROVIDED_SCHEMA:
- // With a wilcard, there should be no existing column unless
- // the planner projected an implicit column and the provided
- // schema defines that same implicit column.
- if (isProjectAll && !SchemaUtils.isImplicit(colSchema)) {
- throw UserException.validationError()
- .message("Provided schema column name conflicts with presumed implicit column name")
- .addContext("Column", colSchema.name())
- .addContext(errorContext)
- .build(logger);
- }
+ // Even with a wildcard, the planner may add additional columns.
+ // Example SELECT * FROM foo ORDER BY bar
+ // The planner will provide us with [`*`, `bar`]
break;
case EARLY_READER_SCHEMA:
// If the reader offers a column which duplicates an implicit column,
@@ -341,6 +362,7 @@
case LENIENT_PROVIDED_SCHEMA:
case STRICT_PROVIDED_SCHEMA:
break;
+ case FIRST_READER_SCHEMA:
case READER_SCHEMA:
if (!allowMapAdditions) {
throw new IllegalStateException("Reader should not have projected column: " + readerCol.name());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java
index 7d1aa30..af5efa4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.scan.v3.schema;
import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -305,6 +306,11 @@
ProjectionType projectionType();
/**
+ * Return the projection for a column, if any.
+ */
+ ProjectedColumn columnProjection(String colName);
+
+ /**
* Is the scan schema resolved? The schema is resolved depending on the
* complex lifecycle explained in the class comment. Resolution occurs
* when the wildcard (if any) is expanded, and all explicit projection
@@ -349,7 +355,7 @@
* then determines which partition columns are needed and calls this
* method to add each one.
*/
- void expandImplicitCol(ColumnMetadata resolved);
+ void expandImplicitCol(ColumnMetadata resolved, ImplicitColumnMarker marker);
/**
* Indicate that implicit column parsing is complete. Returns the implicit
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java
index 24beb38..8e5d6ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.scan.v3.schema;
import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
import org.apache.drill.exec.physical.impl.scan.v3.schema.DynamicSchemaFilter.RowSchemaFilter;
import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -39,10 +40,6 @@
super(errorContext);
this.definedSchema = definedSchema;
schema.copyFrom(definedSchema);
- checkResolved();
-
- // If not resolved, should not have used this tracker.
- Preconditions.checkState(isResolved);
ScanSchemaTracker.ProjectionType projType;
if (schema.size() == 0) {
@@ -51,13 +48,24 @@
projType = ScanSchemaTracker.ProjectionType.SOME;
}
schema.setProjectionType(projType);
+ checkResolved();
+
+ // If not resolved, should not have used this tracker.
+ Preconditions.checkState(isResolved);
}
+ /**
+ * Validate a projection list (provided as an argument) against a
+ * defined schema already held by this tracker. Ensures that, when we
+ * have both a defined schema and projection list, that they are
+ * consistent.
+ *
+ * @param projection the parsed projection list
+ */
public void validateProjection(TupleMetadata projection) {
- if (projection == null) {
- return;
+ if (projection != null) {
+ validateProjection(projection, definedSchema);
}
- validateProjection(projection, definedSchema);
}
@Override
@@ -85,10 +93,13 @@
}
@Override
- public void expandImplicitCol(ColumnMetadata resolved) {
+ public void expandImplicitCol(ColumnMetadata resolved, ImplicitColumnMarker marker) {
throw new IllegalStateException("Can't expand a defined schema.");
}
@Override
public int schemaVersion() { return 1; }
+
+ @Override
+ public ProjectedColumn columnProjection(String colName) { return null; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
index 08df4b3..bf9ba76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
@@ -78,7 +78,6 @@
* variable-length argument list of column values
* @return this writer
*/
-
RowSetLoader addRow(Object... values);
/**
@@ -89,7 +88,6 @@
* @param value value of the one and only column
* @return this writer
*/
-
RowSetLoader addSingleCol(Object value);
/**
@@ -107,7 +105,6 @@
*
* @return true if another row can be written, false if not
*/
-
boolean isFull();
/**
@@ -116,7 +113,6 @@
*
* @return number of rows to be sent downstream
*/
-
int rowCount();
/**
@@ -128,7 +124,6 @@
*
* @return the current write index
*/
-
int rowIndex();
/**
@@ -151,7 +146,6 @@
*
* @return true if another row can be added, false if the batch is full
*/
-
boolean start();
/**
@@ -160,6 +154,5 @@
* to recover from partially-written rows that turn out to contain errors.
* Done automatically if using <tt>setRow()</tt>.
*/
-
void save();
}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index 0467abc..8fe4ebd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -317,7 +317,7 @@
* @param length row group length
* @return implicit column value for specified implicit file column
*/
- private static String getImplicitColumnValue(ImplicitFileColumn column, Path filePath,
+ public static String getImplicitColumnValue(ImplicitFileColumn column, Path filePath,
FileSystem fs, Integer index, Long start, Long length) {
if (column instanceof ImplicitFileColumns) {
ImplicitFileColumns fileColumn = (ImplicitFileColumns) column;
@@ -359,9 +359,10 @@
}
/**
- * Returns list of implicit file columns which includes all elements from {@link ImplicitFileColumns},
- * {@link ImplicitInternalFileColumns#LAST_MODIFIED_TIME} and {@link ImplicitInternalFileColumns#USE_METADATA}
- * columns.
+ * Returns list of implicit file columns which includes all elements from
+ * {@link ImplicitFileColumns},
+ * {@link ImplicitInternalFileColumns#LAST_MODIFIED_TIME} and
+ * {@link ImplicitInternalFileColumns#USE_METADATA} columns.
*
* @return list of implicit file columns
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
index c22d453..580ba7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -23,9 +23,9 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.FsInput;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -72,7 +72,7 @@
TupleMetadata readerSchema = AvroSchemaUtil.convert(reader.getSchema());
logger.debug("Avro file converted schema: {}", readerSchema);
TupleMetadata providedSchema = negotiator.providedSchema();
- TupleMetadata tableSchema = StandardConversions.mergeSchemas(providedSchema, readerSchema);
+ TupleMetadata tableSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
logger.debug("Avro file table schema: {}", tableSchema);
negotiator.tableSchema(tableSchema, true);
loader = negotiator.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java
index 3c42227..527f21d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java
@@ -21,6 +21,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -57,7 +58,7 @@
if (providedSchema == null) {
standardConversions = null;
} else {
- standardConversions = new StandardConversions(providedSchema.properties());
+ standardConversions = StandardConversions.builder().withSchema(providedSchema).build();
}
}
@@ -157,7 +158,7 @@
if (standardConversions == null) {
valueWriter = scalarWriter;
} else {
- valueWriter = standardConversions.converter(scalarWriter, readerSchema);
+ valueWriter = standardConversions.converterFor(scalarWriter, readerSchema);
}
return buildScalar(readerSchema, valueWriter);
}
@@ -246,7 +247,7 @@
TupleWriter tupleWriter, List<ColumnConverter> converters) {
// fill in tuple schema for cases when it contains recursive named record types
TupleMetadata readerSchema = AvroSchemaUtil.convert(genericRecord.getSchema());
- TupleMetadata tableSchema = StandardConversions.mergeSchemas(providedSchema, readerSchema);
+ TupleMetadata tableSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
tableSchema.toMetadataList().forEach(tupleWriter::addColumn);
IntStream.range(0, tableSchema.size())
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 8e3e69e..79840c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -19,8 +19,6 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -160,7 +158,7 @@
for (int i = 0; i < fieldNames.length; i++) {
ScalarWriter colWriter = writer.scalar(fieldNames[i]);
if (writer.isProjected()) {
- colWriters[i] = conversions.converter(colWriter, MinorType.VARCHAR);
+ colWriters[i] = conversions.converterFor(colWriter, MinorType.VARCHAR);
} else {
colWriters[i] = colWriter;
}
@@ -220,7 +218,7 @@
StandardConversions conversions = conversions(providedSchema);
ValueWriter[] colWriters = new ValueWriter[providedSchema.size()];
for (int i = 0; i < colWriters.length; i++) {
- colWriters[i] = conversions.converter(
+ colWriters[i] = conversions.converterFor(
writer.scalar(providedSchema.metadata(i).name()), MinorType.VARCHAR);
}
return new ConstrainedFieldOutput(writer, colWriters);
@@ -248,14 +246,10 @@
// CSV maps blank columns to nulls (for nullable non-string columns),
// or to the default value (for non-nullable non-string columns.)
- Map<String, String> props = providedSchema.properties();
- if (props == null) {
- return new StandardConversions(ColumnMetadata.BLANK_AS_NULL);
- } else {
- props = new HashMap<>(props);
- props.put(ColumnMetadata.BLANK_AS_PROP, ColumnMetadata.BLANK_AS_NULL);
- return new StandardConversions(props);
- }
+ return StandardConversions.builder()
+ .withSchema(providedSchema)
+ .blankAs(ColumnMetadata.BLANK_AS_NULL)
+ .build();
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
index f8c512e..99efd76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
@@ -90,7 +90,7 @@
writers = new ValueWriter[readerSchema.size()];
for (int i = 0; i < writers.length; i++) {
ColumnMetadata colSchema = readerSchema.metadata(i);
- writers[i] = conversions.converter(rowWriter.scalar(i), colSchema);
+ writers[i] = conversions.converterFor(rowWriter.scalar(i), colSchema);
}
}
@@ -178,9 +178,9 @@
if (saveMatchedRows) {
// Save using the defined columns
TupleMetadata providedSchema = config.providedSchema;
- StandardConversions conversions = new StandardConversions(
- providedSchema == null || !providedSchema.hasProperties() ?
- null : providedSchema.properties());
+ StandardConversions conversions = StandardConversions.builder()
+ .withSchema(providedSchema)
+ .build();
vectorWriter = new ScalarGroupWriter(writer, config.readerSchema, conversions);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
index 4f16e76..bfdff94 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
@@ -45,14 +45,23 @@
public class ScanTestUtils {
- // Default file metadata column names; primarily for testing.
+ // Default implicit file column names; primarily for testing.
public static final String FILE_NAME_COL = "filename";
public static final String FULLY_QUALIFIED_NAME_COL = "fqn";
public static final String FILE_PATH_COL = "filepath";
public static final String SUFFIX_COL = "suffix";
public static final String PARTITION_COL = "dir";
+
+ // Default Internal implicit columns; primarily for testing.
+
public static final String LAST_MODIFIED_TIME_COL = "lmt";
+ public static final String ROW_GROUP_INDEX_COL = "rgi";
+ public static final String ROW_GROUP_START_COL = "rgs";
+ public static final String ROW_GROUP_LENGTH_COL = "rgl";
+
+ // Yes, the following both have the same value.
+ public static final String USE_METADATA_COL ="$project_metadata$";
public static final String PROJECT_METADATA_COL = "$project_metadata$";
public static abstract class ScanFixtureBuilder {
@@ -71,8 +80,12 @@
builder().projection(RowSetTestUtils.projectAll());
}
- public void projectAllWithMetadata(int dirs) {
- builder().projection(ScanTestUtils.projectAllWithMetadata(dirs));
+ public void projectAllWithFileImplicit(int dirs) {
+ builder().projection(ScanTestUtils.projectAllWithFileImplicit(dirs));
+ }
+
+ public void projectAllWithAllImplicit(int dirs) {
+ builder().projection(ScanTestUtils.projectAllWithAllImplicit(dirs));
}
public void setProjection(String... projCols) {
@@ -101,7 +114,7 @@
public static class ScanFixture {
- private OperatorContext opContext;
+ private final OperatorContext opContext;
public ScanOperatorExec scanOp;
public ScanFixture(OperatorContext opContext, ScanOperatorExec scanOp) {
@@ -152,7 +165,7 @@
* @return schema with the metadata columns appended to the table columns
*/
- public static TupleMetadata expandMetadata(TupleMetadata base, ImplicitColumnManager metadataProj, int dirCount) {
+ public static TupleMetadata expandImplicit(TupleMetadata base, ImplicitColumnManager metadataProj, int dirCount) {
TupleMetadata metadataSchema = new TupleSchema();
for (ColumnMetadata col : base) {
metadataSchema.addColumn(col);
@@ -188,24 +201,32 @@
return schema;
}
- public static List<SchemaPath> expandMetadata(int dirCount) {
+ public static List<String> expandImplicit(boolean includeInternal, int dirCount) {
List<String> selected = Lists.newArrayList(
FULLY_QUALIFIED_NAME_COL,
FILE_PATH_COL,
FILE_NAME_COL,
- SUFFIX_COL,
- LAST_MODIFIED_TIME_COL,
- PROJECT_METADATA_COL);
+ SUFFIX_COL);
+ if (includeInternal) {
+ selected.add(LAST_MODIFIED_TIME_COL);
+ selected.add(PROJECT_METADATA_COL);
+ }
for (int i = 0; i < dirCount; i++) {
selected.add(PARTITION_COL + i);
}
- return RowSetTestUtils.projectList(selected);
+ return selected;
}
- public static List<SchemaPath> projectAllWithMetadata(int dirCount) {
+ public static List<SchemaPath> projectAllWithFileImplicit(int dirCount) {
return RowSetTestUtils.concat(
RowSetTestUtils.projectAll(),
- expandMetadata(dirCount));
+ RowSetTestUtils.projectList(expandImplicit(false, dirCount)));
+ }
+
+ public static List<SchemaPath> projectAllWithAllImplicit(int dirCount) {
+ return RowSetTestUtils.concat(
+ RowSetTestUtils.projectAll(),
+ RowSetTestUtils.projectList(expandImplicit(true, dirCount)));
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index d4d806a..2efbe2a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -308,7 +308,7 @@
// Create the scan operator
FileScanFixtureBuilder builder = new FileScanFixtureBuilder();
- builder.projectAllWithMetadata(2);
+ builder.projectAllWithAllImplicit(2);
builder.addReader(reader);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
index 65646bc..87ea958 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
@@ -82,6 +82,7 @@
private void buildWriters(TupleMetadata providedSchema,
TupleMetadata schema) {
RowSetLoader rowWriter = tableLoader.writer();
+ StandardConversions conversions = StandardConversions.builder().build();
for (int i = 0; i < schema.size(); i++) {
ColumnMetadata colSchema = schema.metadata(i);
String colName = colSchema.name();
@@ -96,7 +97,7 @@
writers.add(colSchema.name(), rowWriter.scalar(colIndex));
} else {
writers.add(colSchema.name(),
- StandardConversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
+ conversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java
index f57cc5c..36fa31a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java
@@ -131,7 +131,7 @@
// Verify
- TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+ TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
String fqn = ImplicitFileColumns.FQN.getValue(filePath);
String filePathValue = ImplicitFileColumns.FILEPATH.getValue(filePath);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
index 808447d..aa106c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
@@ -32,6 +32,7 @@
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions.ConversionDefn;
import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions.ConversionType;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -70,13 +71,13 @@
}
public ConversionTestFixture withProperties(Map<String,String> props) {
- conversions = new StandardConversions(props);
+ conversions = StandardConversions.builder().withProperties(props).build();
return this;
}
private StandardConversions conversions() {
if (conversions == null) {
- conversions = new StandardConversions();
+ conversions = StandardConversions.builder().build();
}
return conversions;
}
@@ -92,7 +93,7 @@
// Test uses simple row writer; no support for adding columns.
assertNotNull(colWriter);
- ValueWriter converter = conversions().converter(colWriter, source);
+ ValueWriter converter = conversions().converterFor(colWriter, source);
assertNotNull(converter);
rowFormat.add(source.name(), converter);
}
@@ -136,7 +137,7 @@
.add("d", MinorType.VARCHAR)
.build();
- TupleMetadata mergedSchema = StandardConversions.mergeSchemas(providedSchema, readerSchema);
+ TupleMetadata mergedSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
assertTrue(expected.isEquivalent(mergedSchema));
assertTrue(mergedSchema.booleanProperty("foo"));
}
@@ -680,6 +681,7 @@
*/
@Test
public void testBasicConversionType() {
+ StandardConversions conversions = StandardConversions.builder().build();
TupleMetadata schema = new SchemaBuilder()
.add("ti", MinorType.TINYINT)
.add("si", MinorType.SMALLINT)
@@ -700,84 +702,84 @@
ColumnMetadata stringCol = schema.metadata("str");
// TinyInt --> x
- expect(ConversionType.NONE, StandardConversions.analyze(tinyIntCol, tinyIntCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, smallIntCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, intCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, bigIntCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, float4Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, float8Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, decimalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(tinyIntCol, stringCol));
+ expect(ConversionType.NONE, conversions.analyze(tinyIntCol, tinyIntCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, smallIntCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, intCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, bigIntCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float4Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float8Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, decimalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(tinyIntCol, stringCol));
// SmallInt --> x
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(smallIntCol, tinyIntCol));
- expect(ConversionType.NONE, StandardConversions.analyze(smallIntCol, smallIntCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, intCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, bigIntCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, float4Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, float8Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, decimalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(smallIntCol, stringCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(smallIntCol, tinyIntCol));
+ expect(ConversionType.NONE, conversions.analyze(smallIntCol, smallIntCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, intCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, bigIntCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, float4Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, float8Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, decimalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(smallIntCol, stringCol));
// Int --> x
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(intCol, tinyIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(intCol, smallIntCol));
- expect(ConversionType.NONE, StandardConversions.analyze(intCol, intCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, bigIntCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, float4Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, float8Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, decimalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(intCol, stringCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(intCol, tinyIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(intCol, smallIntCol));
+ expect(ConversionType.NONE, conversions.analyze(intCol, intCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(intCol, bigIntCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(intCol, float4Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(intCol, float8Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(intCol, decimalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(intCol, stringCol));
// BigInt --> x
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, tinyIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, smallIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, intCol));
- expect(ConversionType.NONE, StandardConversions.analyze(bigIntCol, bigIntCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, float4Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, float8Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, decimalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(bigIntCol, stringCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(bigIntCol, tinyIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(bigIntCol, smallIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(bigIntCol, intCol));
+ expect(ConversionType.NONE, conversions.analyze(bigIntCol, bigIntCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, float4Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, float8Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, decimalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(bigIntCol, stringCol));
// Float4 --> x
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, tinyIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, smallIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, intCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, bigIntCol));
- expect(ConversionType.NONE, StandardConversions.analyze(float4Col, float4Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(float4Col, float8Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(float4Col, decimalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(float4Col, stringCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, tinyIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, smallIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, intCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, bigIntCol));
+ expect(ConversionType.NONE, conversions.analyze(float4Col, float4Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(float4Col, float8Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(float4Col, decimalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(float4Col, stringCol));
// Float8 --> x
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, tinyIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, smallIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, intCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, bigIntCol));
- expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, float4Col));
- expect(ConversionType.NONE, StandardConversions.analyze(float8Col, float8Col));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(float8Col, decimalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(float8Col, stringCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, tinyIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, smallIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, intCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, bigIntCol));
+ expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, float4Col));
+ expect(ConversionType.NONE, conversions.analyze(float8Col, float8Col));
+ expect(ConversionType.IMPLICIT, conversions.analyze(float8Col, decimalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(float8Col, stringCol));
// Decimal --> x
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, tinyIntCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, smallIntCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, intCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, bigIntCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, float4Col));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, float8Col));
- expect(ConversionType.NONE, StandardConversions.analyze(decimalCol, decimalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, tinyIntCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, smallIntCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, intCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, bigIntCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, float4Col));
+ expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, float8Col));
+ expect(ConversionType.NONE, conversions.analyze(decimalCol, decimalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, stringCol));
// VarChar --> x
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, tinyIntCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, smallIntCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, intCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, bigIntCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, float4Col));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, float8Col));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, decimalCol));
- expect(ConversionType.NONE, StandardConversions.analyze(stringCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, tinyIntCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, smallIntCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, intCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, bigIntCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, float4Col));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, float8Col));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, decimalCol));
+ expect(ConversionType.NONE, conversions.analyze(stringCol, stringCol));
}
/**
@@ -785,6 +787,7 @@
*/
@Test
public void testSpecialConversionType() {
+ StandardConversions conversions = StandardConversions.builder().build();
TupleMetadata schema = new SchemaBuilder()
.add("time", MinorType.TIME)
.add("date", MinorType.DATE)
@@ -807,40 +810,40 @@
ColumnMetadata stringCol = schema.metadata("str");
// TIME
- expect(ConversionType.NONE, StandardConversions.analyze(timeCol, timeCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(timeCol, stringCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, timeCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, timeCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(timeCol, intCol));
+ expect(ConversionType.NONE, conversions.analyze(timeCol, timeCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(timeCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, timeCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(intCol, timeCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(timeCol, intCol));
// DATE
- expect(ConversionType.NONE, StandardConversions.analyze(dateCol, dateCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(dateCol, stringCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, dateCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, dateCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(dateCol, bigIntCol));
+ expect(ConversionType.NONE, conversions.analyze(dateCol, dateCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(dateCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, dateCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, dateCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(dateCol, bigIntCol));
// TIMESTAMP
- expect(ConversionType.NONE, StandardConversions.analyze(tsCol, tsCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(tsCol, stringCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, tsCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, tsCol));
- expect(ConversionType.IMPLICIT, StandardConversions.analyze(tsCol, bigIntCol));
+ expect(ConversionType.NONE, conversions.analyze(tsCol, tsCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(tsCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, tsCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, tsCol));
+ expect(ConversionType.IMPLICIT, conversions.analyze(tsCol, bigIntCol));
// INTERVAL
- expect(ConversionType.NONE, StandardConversions.analyze(intervalCol, intervalCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(intervalCol, stringCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, intervalCol));
+ expect(ConversionType.NONE, conversions.analyze(intervalCol, intervalCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(intervalCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, intervalCol));
// INTERVALYEAR
- expect(ConversionType.NONE, StandardConversions.analyze(yearCol, yearCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(yearCol, stringCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, yearCol));
+ expect(ConversionType.NONE, conversions.analyze(yearCol, yearCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(yearCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, yearCol));
// INTERVALDAY
- expect(ConversionType.NONE, StandardConversions.analyze(dayCol, dayCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(dayCol, stringCol));
- expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, dayCol));
+ expect(ConversionType.NONE, conversions.analyze(dayCol, dayCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(dayCol, stringCol));
+ expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, dayCol));
}
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index f4cee33..c5d92fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -24,7 +24,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -47,7 +47,7 @@
* Test the level of projection done at the level of the scan as a whole;
* before knowledge of table "implicit" columns or the specific table schema.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanLevelProjection extends SubOperatorTest {
private boolean isProjected(ProjectionFilter filter, ColumnMetadata col) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index ea55bc9..842f18e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -87,7 +87,6 @@
* because such an algorithm would require time-travel: looking into
* the future to know what data will be scanned.)
*/
-
@Category(RowSetTests.class)
public class TestSchemaSmoothing extends SubOperatorTest {
@@ -103,12 +102,10 @@
* discrete is just to run the basic lifecycle in a way that
* is compatible with the schema-persistence version.
*/
-
@Test
public void testDiscrete() {
// Set up the file metadata manager
-
Path filePathA = new Path("hdfs:///w/x/y/a.csv");
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -116,18 +113,15 @@
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
-
ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
ScanTestUtils.parsers(metadataManager.projectionParser()));
{
// Define a file a.csv
-
metadataManager.startFile(filePathA);
// Build the output schema from the (a, b) table schema
-
TupleMetadata twoColSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR, 10)
@@ -139,7 +133,6 @@
ScanTestUtils.resolvers(metadataManager));
// Verify the full output schema
-
TupleMetadata expectedSchema = new SchemaBuilder()
.add("filename", MinorType.VARCHAR)
.add("a", MinorType.INT)
@@ -147,21 +140,18 @@
.buildSchema();
// Verify
-
List<ResolvedColumn> columns = rootTuple.columns();
assertEquals(3, columns.size());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value());
assertTrue(columns.get(1) instanceof ResolvedTableColumn);
- }
+ }
{
// Define a file b.csv
-
metadataManager.startFile(filePathB);
// Build the output schema from the (a) table schema
-
TupleMetadata oneColSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
@@ -177,7 +167,6 @@
// vector level as part of vector persistence.) During projection, it is
// marked with type NULL so that the null column builder will fill in
// the proper type.
-
TupleMetadata expectedSchema = new SchemaBuilder()
.add("filename", MinorType.VARCHAR)
.add("a", MinorType.INT)
@@ -185,7 +174,6 @@
.buildSchema();
// Verify
-
List<ResolvedColumn> columns = rootTuple.columns();
assertEquals(3, columns.size());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
@@ -200,7 +188,6 @@
* Low-level test of the smoothing projection, including the exceptions
* it throws when things are not going its way.
*/
-
@Test
public void testSmoothingProjection() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -208,7 +195,6 @@
ScanTestUtils.parsers());
// Table 1: (a: nullable bigint, b)
-
final TupleMetadata schema1 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
@@ -225,7 +211,6 @@
}
// Table 2: (a: nullable bigint, c), column omitted, original schema preserved
-
final TupleMetadata schema2 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.add("c", MinorType.FLOAT8)
@@ -243,7 +228,6 @@
}
// Table 3: (a, c, d), column added, must replan schema
-
final TupleMetadata schema3 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
@@ -262,7 +246,6 @@
}
// Table 4: (a: double), change type must replan schema
-
final TupleMetadata schema4 = new SchemaBuilder()
.addNullable("a", MinorType.FLOAT8)
.addNullable("b", MinorType.VARCHAR)
@@ -280,7 +263,6 @@
}
// Table 5: Drop a non-nullable column, must replan
-
final TupleMetadata schema6 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
@@ -302,7 +284,6 @@
* schema. Discard prior schema. Turn off auto expansion of
* metadata for a simpler test.
*/
-
@Test
public void testSmaller() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -340,7 +321,6 @@
* Case in which the table schema and prior are disjoint
* sets. Discard the prior schema.
*/
-
@Test
public void testDisjoint() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -377,7 +357,6 @@
/**
* Column names match, but types differ. Discard the prior schema.
*/
-
@Test
public void testDifferentTypes() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -411,7 +390,6 @@
* schema (though, the output is no different than if we discarded
* the prior schema...)
*/
-
@Test
public void testSameSchemas() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -446,7 +424,6 @@
* The prior and table schemas are identical, but the cases of names differ.
* Preserve the case of the first schema.
*/
-
@Test
public void testDifferentCase() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -481,7 +458,6 @@
* Can't preserve the prior schema if it had required columns
* where the new schema has no columns.
*/
-
@Test
public void testRequired() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -513,7 +489,6 @@
* Preserve the prior schema if table is a subset and missing columns
* are nullable or repeated.
*/
-
@Test
public void testMissingNullableColumns() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -546,7 +521,6 @@
* Preserve the prior schema if table is a subset. Map the table
* columns to the output using the prior schema ordering.
*/
-
@Test
public void testReordering() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -580,12 +554,10 @@
* If using the legacy wildcard expansion, reuse schema if partition paths
* are the same length.
*/
-
@Test
public void testSamePartitionLength() {
// Set up the file metadata manager
-
Path filePathA = new Path("hdfs:///w/x/y/a.csv");
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -593,9 +565,8 @@
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
-
ScanLevelProjection scanProj = ScanLevelProjection.build(
- ScanTestUtils.projectAllWithMetadata(2),
+ ScanTestUtils.projectAllWithAllImplicit(2),
ScanTestUtils.parsers(metadataManager.projectionParser()));
// Define the schema smoother
@@ -608,7 +579,7 @@
.add("b", MinorType.VARCHAR)
.buildSchema();
- TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+ TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
{
metadataManager.startFile(filePathA);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
@@ -627,12 +598,10 @@
* is shorter than the previous. (Unneeded partitions will be set to null by the
* scan projector.)
*/
-
@Test
public void testShorterPartitionLength() {
// Set up the file metadata manager
-
Path filePathA = new Path("hdfs:///w/x/y/a.csv");
Path filePathB = new Path("hdfs:///w/x/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -640,13 +609,11 @@
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
-
ScanLevelProjection scanProj = ScanLevelProjection.build(
- ScanTestUtils.projectAllWithMetadata(2),
+ ScanTestUtils.projectAllWithAllImplicit(2),
ScanTestUtils.parsers(metadataManager.projectionParser()));
// Define the schema smoother
-
SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers(metadataManager));
@@ -655,7 +622,7 @@
.add("b", MinorType.VARCHAR)
.buildSchema();
- TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+ TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
{
metadataManager.startFile(filePathA);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
@@ -674,12 +641,10 @@
* schema even if the new partition path is longer than the previous.
* Because all file names are provided up front.
*/
-
@Test
public void testLongerPartitionLength() {
// Set up the file metadata manager
-
Path filePathA = new Path("hdfs:///w/x/a.csv");
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -687,13 +652,11 @@
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
-
ScanLevelProjection scanProj = ScanLevelProjection.build(
- ScanTestUtils.projectAllWithMetadata(2),
+ ScanTestUtils.projectAllWithAllImplicit(2),
ScanTestUtils.parsers(metadataManager.projectionParser()));
// Define the schema smoother
-
SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers(metadataManager));
@@ -702,7 +665,7 @@
.add("b", MinorType.VARCHAR)
.buildSchema();
- TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+ TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
{
metadataManager.startFile(filePathA);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
@@ -719,7 +682,6 @@
/**
* Integrated test across multiple schemas at the batch level.
*/
-
@Test
public void testSmoothableSchemaBatches() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestFixedReceiver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestFixedReceiver.java
new file mode 100644
index 0000000..ef4dcd3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestFixedReceiver.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver.Builder;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+public class TestFixedReceiver extends BaseScanTest {
+
+ /**
+ * Mock reader which writes columns as strings, using
+ * standard conversions to convert to different types as
+ * specified in a provided schema.
+ */
+ private static class MockReader implements ManagedReader {
+
+ private final FixedReceiver receiver;
+
+ public MockReader(SchemaNegotiator negotiator) {
+ Builder builder = FixedReceiver.builderFor(negotiator)
+ .schemaIsComplete();
+ TupleMetadata readerSchema = new SchemaBuilder()
+ .add("ti", MinorType.VARCHAR)
+ .add("si", MinorType.VARCHAR)
+ .add("int", MinorType.VARCHAR)
+ .add("bi", MinorType.VARCHAR)
+ .add("fl", MinorType.VARCHAR)
+ .add("db", MinorType.VARCHAR)
+ .buildSchema();
+ receiver = builder.build(readerSchema);
+ }
+
+ @Override
+ public boolean next() {
+ if (receiver.rowWriter().loader().batchCount() > 0) {
+ return false;
+ }
+ receiver.start();
+ receiver.scalar(0).setString("11");
+ receiver.scalar(1).setString("12");
+ receiver.scalar(2).setString("13");
+ receiver.scalar(3).setString("14");
+ receiver.scalar(4).setString("15.5");
+ receiver.scalar(5).setString("16.25");
+ receiver.save();
+
+ receiver.start();
+ receiver.scalar("ti").setString("127");
+ receiver.scalar("si").setString("32757");
+ receiver.scalar("int").setString(Integer.toString(Integer.MAX_VALUE));
+ receiver.scalar("bi").setString(Long.toString(Long.MAX_VALUE));
+ receiver.scalar("fl").setString("10E6");
+ receiver.scalar("db").setString("10E200");
+ receiver.save();
+ return true;
+ }
+
+ @Override
+ public void close() { }
+ }
+
+ /**
+ * Test the standard string-to-type conversion using an ad-hoc conversion
+ * from the input type (the type used by the row set builder) to the output
+ * (vector) type.
+ */
+ @Test
+ public void testFixedReceiver() {
+
+ // Create the provided and output schemas
+ TupleMetadata outputSchema = new SchemaBuilder()
+ .add("ti", MinorType.TINYINT)
+ .add("si", MinorType.SMALLINT)
+ .add("int", MinorType.INT)
+ .add("bi", MinorType.BIGINT)
+ .add("fl", MinorType.FLOAT4)
+ .add("db", MinorType.FLOAT8)
+ .buildSchema();
+
+ // Create the scan
+ BaseScanFixtureBuilder builder = simpleBuilder(negotiator -> new MockReader(negotiator));
+ builder.builder.providedSchema(outputSchema);
+ ScanFixture scanFixture = builder.build();
+ ScanOperatorExec scan = scanFixture.scanOp;
+
+ // Load test data using converters
+ assertTrue(scan.next());
+
+ // Build the expected vector without a type converter.
+ final SingleRowSet expected = fixture.rowSetBuilder(outputSchema)
+ .addRow(11, 12, 13, 14L, 15.5F, 16.25D)
+ .addRow(127, 32757, Integer.MAX_VALUE, Long.MAX_VALUE, 10E6F, 10E200D)
+ .build();
+
+ // Compare
+ VectorContainer container = scan.batchAccessor().container();
+ RowSetUtilities.verify(expected, fixture.wrap(container));
+ scanFixture.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java
index 2886d26..cba3a79 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java
@@ -23,7 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
@@ -38,7 +38,7 @@
* Tests the basics of the scan operator protocol: error conditions,
* etc.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanBasics extends BaseScanTest {
private static final Logger logger = LoggerFactory.getLogger(TestScanBasics.class);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java
index c0fbbfb..b68dc16 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java
@@ -21,7 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -34,7 +34,7 @@
* Test "early schema" readers: those that can declare a schema at
* open time.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanEarlySchema extends BaseScanTest {
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java
index cdbc3b1..cfd3d09 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java
@@ -22,7 +22,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
@@ -40,7 +40,7 @@
* Test "late schema" readers: those like JSON that discover their schema
* as they read data.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanLateSchema extends BaseScanTest {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
index 9d94e9f..7b14c41 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
@@ -20,7 +20,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
@@ -43,7 +43,7 @@
* defines the schema to be output from the scan operator, and forces
* conversions between reader and output data types.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanOuputSchema extends BaseScanTest {
private static class MockSimpleReader implements ManagedReader {
@@ -78,6 +78,7 @@
private void buildWriters(TupleMetadata providedSchema,
TupleMetadata schema) {
RowSetLoader rowWriter = tableLoader.writer();
+ StandardConversions conversions = StandardConversions.builder().build();
for (int i = 0; i < schema.size(); i++) {
ColumnMetadata colSchema = schema.metadata(i);
String colName = colSchema.name();
@@ -92,7 +93,7 @@
writers.add(colSchema.name(), rowWriter.scalar(colIndex));
} else {
writers.add(colSchema.name(),
- StandardConversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
+ conversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java
index efdcbbf..d813ec9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java
@@ -23,7 +23,7 @@
import java.util.Arrays;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
@@ -35,7 +35,7 @@
/**
* Test vector overflow in the context of the scan operator.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanOverflow extends BaseScanTest {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java
index ad3dc01..e35fc39 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.impl.scan.v3.file;
import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver;
import org.apache.hadoop.fs.Path;
public interface MockFileNames {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java
index 24563c1..0dfdd9e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java
@@ -18,45 +18,41 @@
package org.apache.drill.exec.physical.impl.scan.v3.file;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import java.io.IOException;
+
+import org.apache.drill.categories.EvfTest;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
import org.apache.drill.test.BaseTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestFileDescrip extends BaseTest {
- /**
- * Degenerate case: no file or root
- */
- @Test
- public void testEmpty() {
- FileDescrip fd = new FileDescrip(null, null);
- assertFalse(fd.isSet());
- assertNull(fd.filePath());
- assertEquals(0, fd.dirPathLength());
- assertNull(fd.partition(0));
+ private static final DrillFileSystem dfs;
+
+ static {
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+ try {
+ dfs = new DrillFileSystem(conf);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
- /**
- * Degenerate case: no file path, but with as selection root
- * Should never occur in practice.
- */
- @Test
- public void testNoPath() {
- Path root = new Path("hdfs://a/b");
- FileDescrip fd = new FileDescrip(null, root);
- assertFalse(fd.isSet());
- assertNull(fd.filePath());
- assertEquals(0, fd.dirPathLength());
- assertNull(fd.partition(0));
+ private FileDescrip fileDescrip(Path input, Path root) {
+ return new FileDescrip(dfs,
+ new FileWorkImpl(0, 1000, input),
+ root);
}
/**
@@ -65,9 +61,8 @@
*/
@Test
public void testNoRoot() {
- Path input = new Path("hdfs://foo.csv");
- FileDescrip fd = new FileDescrip(input, null);
- assertTrue(fd.isSet());
+ Path input = new Path("file:///foo.csv");
+ FileDescrip fd = fileDescrip(input, null);
assertSame(input, fd.filePath());
assertEquals(0, fd.dirPathLength());
assertNull(fd.partition(0));
@@ -78,9 +73,8 @@
*/
@Test
public void testSingleFile() {
- Path input = new Path("hdfs://a/b/c/foo.csv");
- FileDescrip fd = new FileDescrip(input, null);
- assertTrue(fd.isSet());
+ Path input = new Path("file:///a/b/c/foo.csv");
+ FileDescrip fd = fileDescrip(input, null);
assertSame(input, fd.filePath());
assertEquals(0, fd.dirPathLength());
assertNull(fd.partition(0));
@@ -91,10 +85,9 @@
*/
@Test
public void testRootFile() {
- Path root = new Path("hdfs://a/b");
- Path input = new Path("hdfs://a/b/foo.csv");
- FileDescrip fd = new FileDescrip(input, root);
- assertTrue(fd.isSet());
+ Path root = new Path("file:///a/b");
+ Path input = new Path("file:///a/b/foo.csv");
+ FileDescrip fd = fileDescrip(input, root);
assertSame(input, fd.filePath());
assertEquals(0, fd.dirPathLength());
assertNull(fd.partition(0));
@@ -105,10 +98,9 @@
*/
@Test
public void testBelowRoot() {
- Path root = new Path("hdfs://a/b");
- Path input = new Path("hdfs://a/b/c/foo.csv");
- FileDescrip fd = new FileDescrip(input, root);
- assertTrue(fd.isSet());
+ Path root = new Path("file:///a/b");
+ Path input = new Path("file:///a/b/c/foo.csv");
+ FileDescrip fd = fileDescrip(input, root);
assertSame(input, fd.filePath());
assertEquals(1, fd.dirPathLength());
assertEquals("c", fd.partition(0));
@@ -121,10 +113,10 @@
*/
@Test
public void testAboveRoot() {
- Path root = new Path("hdfs://a/b");
- Path input = new Path("hdfs://a/foo.csv");
+ Path root = new Path("file:///a/b");
+ Path input = new Path("file:///a/foo.csv");
try {
- new FileDescrip(input, root);
+ fileDescrip(input, root);
fail();
} catch (IllegalArgumentException e) {
// Expected
@@ -137,10 +129,10 @@
*/
@Test
public void testDisjointPath() {
- Path root = new Path("hdfs://a/b");
- Path input = new Path("hdfs://d/foo.csv");
+ Path root = new Path("file:///a/b");
+ Path input = new Path("file:///d/foo.csv");
try {
- new FileDescrip(input, root);
+ fileDescrip(input, root);
fail();
} catch (IllegalArgumentException e) {
// Expected
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java
index 1f45f6b..9f7d4c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java
@@ -21,7 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -43,7 +43,7 @@
* Focuses on the file implicit columns, assumes that other tests have
* verified the underlying mechanisms.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestFileScan extends BaseFileScanTest {
/**
@@ -57,8 +57,9 @@
public MockLateSchemaReader(FileSchemaNegotiator negotiator) {
// Verify the path
- assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.fileWork().getPath().toString());
- assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.split().getPath().toString());
+ assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.file().filePath().toString());
+ assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.file().fileWork().getPath().toString());
+ assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.file().split().getPath().toString());
// No schema or file, just build the table loader.
tableLoader = negotiator.build();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java
index 2b6f0cf..00931db 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java
@@ -25,7 +25,7 @@
import java.util.Collections;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
@@ -43,7 +43,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestFileScanLifecycle extends BaseTestScanLifecycle implements MockFileNames {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java
index 4b64ef5..e064a20 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java
@@ -20,13 +20,14 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import java.io.IOException;
import java.util.List;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder;
-import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.BaseTestScanLifecycle.DummySubScan;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaConfigBuilder;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
@@ -35,8 +36,13 @@
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -45,19 +51,51 @@
* Tests the file implicit column handler which identifies implicit columns
* and populates them. Assumes that the implicit column parser tests pass.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestImplicitColumnLoader extends SubOperatorTest implements MockFileNames {
+ private static final DrillFileSystem dfs;
+
+ static {
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+ try {
+ dfs = new DrillFileSystem(conf);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static class ImplicitFixture {
+ final FileScanLifecycleBuilder options;
+ final ScanSchemaTracker schemaTracker;
+ FileDescrip fileDescrip;
+ ImplicitFileColumnsHandler handler;
+
+ public ImplicitFixture(List<SchemaPath> projList, Path root) {
+ this.options = new FileScanLifecycleBuilder();
+ options.rootDir(root);
+ this.schemaTracker = new ScanSchemaConfigBuilder()
+ .projection(projList)
+ .build();
+ }
+
+ public void build(Path input) {
+ final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ handler = new ImplicitFileColumnsHandler(
+ dfs, fixture.getOptionManager(), options, cache, schemaTracker);
+ FileWork fileWork = new FileWorkImpl(0, 1000, input);
+ fileDescrip = handler.makeDescrip(fileWork);
+ }
+
+ public StaticBatchBuilder batchBuilder() {
+ return handler.forFile(fileDescrip);
+ }
+ }
public StaticBatchBuilder buildHandler(List<SchemaPath> projList, Path root, Path input) {
- FileScanLifecycleBuilder options = new FileScanLifecycleBuilder();
- options.rootDir(root);
- final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
- final ScanSchemaTracker schemaTracker = new ScanSchemaConfigBuilder()
- .projection(projList)
- .build();
- ImplicitFileColumnsHandler handler = new ImplicitFileColumnsHandler(
- fixture.operatorContext(new DummySubScan()), options, cache, schemaTracker);
- return handler.forFile(input);
+ ImplicitFixture fixture = new ImplicitFixture(projList, root);
+ fixture.build(input);
+ return fixture.batchBuilder();
}
@Test
@@ -84,9 +122,9 @@
}
@Test
- public void testAllColumns() {
+ public void testNonInternalColumns() {
StaticBatchBuilder batchLoader = buildHandler(
- ScanTestUtils.projectAllWithMetadata(3),
+ ScanTestUtils.projectAllWithFileImplicit(3),
MOCK_ROOT_PATH, MOCK_FILE_PATH);
assertNotNull(batchLoader);
batchLoader.load(2);
@@ -106,4 +144,59 @@
.build();
RowSetUtilities.verify(expected, fixture.wrap(batchLoader.outputContainer()));
}
+
+ @Test
+ public void testInternalColumns() {
+ ImplicitFixture testFixture = new ImplicitFixture(
+ RowSetTestUtils.projectList(
+ ScanTestUtils.LAST_MODIFIED_TIME_COL,
+ ScanTestUtils.PROJECT_METADATA_COL,
+ ScanTestUtils.ROW_GROUP_INDEX_COL,
+ ScanTestUtils.ROW_GROUP_START_COL,
+ ScanTestUtils.ROW_GROUP_LENGTH_COL),
+ MOCK_ROOT_PATH);
+ testFixture.build(MOCK_FILE_PATH);
+ testFixture.fileDescrip.setRowGroupAttribs(10, 10_000, 5_000);
+ testFixture.fileDescrip.setModTime("123456789");
+ StaticBatchBuilder batchLoader = testFixture.batchBuilder();
+ assertNotNull(batchLoader);
+ batchLoader.load(2);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add(ScanTestUtils.LAST_MODIFIED_TIME_COL, MinorType.VARCHAR)
+ .addNullable(ScanTestUtils.PROJECT_METADATA_COL, MinorType.VARCHAR)
+ .add(ScanTestUtils.ROW_GROUP_INDEX_COL, MinorType.VARCHAR)
+ .add(ScanTestUtils.ROW_GROUP_START_COL, MinorType.VARCHAR)
+ .add(ScanTestUtils.ROW_GROUP_LENGTH_COL, MinorType.VARCHAR)
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("123456789", null, "10", "10000", "5000")
+ .addRow("123456789", null, "10", "10000", "5000")
+ .build();
+ RowSetUtilities.verify(expected, fixture.wrap(batchLoader.outputContainer()));
+ }
+
+ @Test
+ public void testInternalEmptyFile() {
+ ImplicitFixture testFixture = new ImplicitFixture(
+ RowSetTestUtils.projectList(
+ ScanTestUtils.LAST_MODIFIED_TIME_COL,
+ ScanTestUtils.PROJECT_METADATA_COL),
+ MOCK_ROOT_PATH);
+ testFixture.build(MOCK_FILE_PATH);
+ testFixture.fileDescrip.setModTime("123456789");
+ testFixture.fileDescrip.markEmpty();
+ StaticBatchBuilder batchLoader = testFixture.batchBuilder();
+ assertNotNull(batchLoader);
+ batchLoader.load(1);
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add(ScanTestUtils.LAST_MODIFIED_TIME_COL, MinorType.VARCHAR)
+ .addNullable(ScanTestUtils.PROJECT_METADATA_COL, MinorType.VARCHAR)
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("123456789", "false")
+ .build();
+ RowSetUtilities.verify(expected, fixture.wrap(batchLoader.outputContainer()));
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java
index a87e330..f10083d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java
@@ -22,21 +22,20 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ImplicitColumnOptions;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ParseResult;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema;
import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema.ColumnHandle;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ImplicitColumnOptions;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ParseResult;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectionSchemaTracker;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
@@ -49,11 +48,14 @@
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.test.SubOperatorTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestImplicitColumnResolver extends SubOperatorTest {
private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
@@ -77,7 +79,7 @@
}
private boolean isImplicit(List<ColumnHandle> cols, int index) {
- return SchemaUtils.isImplicit(cols.get(index).column());
+ return cols.get(index).isImplicit();
}
@Test
@@ -87,6 +89,7 @@
ParseResult result = parseFixture.parseImplicit();
assertTrue(result.columns().isEmpty());
assertTrue(result.schema().isEmpty());
+ assertFalse(result.isMetadataScan());
}
/**
@@ -103,6 +106,7 @@
ScanTestUtils.SUFFIX_COL));
ParseResult result = parseFixture.parseImplicit();
+ assertFalse(result.isMetadataScan());
assertEquals(4, result.columns().size());
TupleMetadata expected = new SchemaBuilder()
@@ -132,6 +136,7 @@
ParserFixture parseFixture = new ParserFixture(
RowSetTestUtils.projectList(dir2, dir1, dir0, "a"));
ParseResult result = parseFixture.parseImplicit();
+ assertFalse(result.isMetadataScan());
assertEquals(3, result.columns().size());
TupleMetadata expected = new SchemaBuilder()
@@ -156,6 +161,7 @@
.maxPartitionDepth(3)
.useLegacyWildcardExpansion(true);
ParseResult result = parseFixture.parseImplicit();
+ assertFalse(result.isMetadataScan());
assertEquals(3, result.columns().size());
TupleMetadata expected = new SchemaBuilder()
@@ -535,12 +541,6 @@
assertTrue(schemaTracker.isResolved());
assertSame(ProjectionType.SOME, schemaTracker.projectionType());
- // Implicit columns should be marked
- MutableTupleSchema internalSchema = schemaTracker.internalSchema();
- assertFalse(internalSchema.find("a").isImplicit());
- assertTrue(internalSchema.find(ScanTestUtils.FILE_NAME_COL).isImplicit());
- assertTrue(internalSchema.find(ScanTestUtils.partitionColName(2)).isImplicit());
-
ImplicitColumnOptions options = new ImplicitColumnOptions()
.optionSet(fixture.getOptionManager());
ImplicitColumnResolver parser = new ImplicitColumnResolver(options, ERROR_CONTEXT);
@@ -552,4 +552,62 @@
assertEquals(definedSchema, schemaTracker.outputSchema());
}
+
+ /**
+ * Test including internal implicit columns in the project list.
+ * @throws IOException
+ */
+ @Test
+ public void testInternalImplicitColumnSelection() throws IOException {
+ // Simulate SELECT lmt, $project_metadata$", rgi, rgs, rgl ...
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+ DrillFileSystem dfs = new DrillFileSystem(conf);
+ ParserFixture parseFixture = new ParserFixture(
+ RowSetTestUtils.projectList("a",
+ ScanTestUtils.LAST_MODIFIED_TIME_COL,
+ ScanTestUtils.PROJECT_METADATA_COL,
+ ScanTestUtils.ROW_GROUP_INDEX_COL,
+ ScanTestUtils.ROW_GROUP_START_COL,
+ ScanTestUtils.ROW_GROUP_LENGTH_COL));
+ parseFixture.options.dfs(dfs);
+ ParseResult result = parseFixture.parseImplicit();
+
+ assertTrue(result.isMetadataScan());
+ assertEquals(5, result.columns().size());
+
+ TupleMetadata expected = new SchemaBuilder()
+ .add(ScanTestUtils.LAST_MODIFIED_TIME_COL, MinorType.VARCHAR)
+ .addNullable(ScanTestUtils.PROJECT_METADATA_COL, MinorType.VARCHAR)
+ .add(ScanTestUtils.ROW_GROUP_INDEX_COL, MinorType.VARCHAR)
+ .add(ScanTestUtils.ROW_GROUP_START_COL, MinorType.VARCHAR)
+ .add(ScanTestUtils.ROW_GROUP_LENGTH_COL, MinorType.VARCHAR)
+ .build();
+ assertEquals(expected, result.schema());
+
+ List<ColumnHandle> cols = parseFixture.tracker.internalSchema().columns();
+ assertFalse(isImplicit(cols, 0));
+ assertTrue(isImplicit(cols, 1));
+ assertTrue(isImplicit(cols, 2));
+ assertTrue(isImplicit(cols, 3));
+ assertTrue(isImplicit(cols, 4));
+ assertTrue(isImplicit(cols, 5));
+ }
+
+ @Test
+ public void testProvidedImplicitColInternal() {
+ TupleMetadata providedSchema = new SchemaBuilder()
+ .add("myLmt", MinorType.INT)
+ .build();
+ SchemaUtils.markImplicit(providedSchema.metadata("myLmt"), ScanTestUtils.LAST_MODIFIED_TIME_COL);
+
+ ParserFixture parseFixture = new ParserFixture(
+ RowSetTestUtils.projectAll());
+ parseFixture.tracker.applyProvidedSchema(providedSchema);
+ try {
+ parseFixture.parseImplicit();
+ } catch (UserException e) {
+ assertTrue(e.getMessage().contains("references an undefined implicit column type"));
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
index 5468175..66f10fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
@@ -135,7 +135,6 @@
public void close() {}
}
-
/**
* Mock reader with no data or schema, indicated by an early EOF
* exception.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java
index 7e12260..b6becd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java
@@ -21,7 +21,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -49,7 +49,7 @@
* can create the classic nullable Int null column, or one of
* any other type and mode.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestMissingColumnLoader extends SubOperatorTest {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
index 0ec3196..ff83b1d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
@@ -22,7 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.OutputBatchBuilder.BatchSource;
import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -36,7 +36,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestOutputBatchBuilder extends SubOperatorTest {
public TupleMetadata firstSchema() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java
index 1500295..7809904 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java
@@ -22,7 +22,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
@@ -35,7 +35,7 @@
* Verifies proper handling of errors from a reader, including use of the
* scan and reader error contexts.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestReaderErrors extends BaseTestScanLifecycle {
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java
index 75700c1..24a083d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java
@@ -23,7 +23,7 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
@@ -34,13 +34,14 @@
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanLifecycleBasics extends BaseTestScanLifecycle {
/**
@@ -122,11 +123,17 @@
// Early schema: so output schema is available after open
assertTrue(scan.hasOutputSchema());
assertEquals(SCHEMA, scan.outputSchema());
- assertFalse(reader.next());
- reader.close();
+ assertTrue(reader.next());
+ VectorContainer result = reader.output();
+ assertEquals(0, result.getRecordCount());
+ result.zeroVectors();
// Early schema with no additional columns discovered
assertEquals(SCHEMA, scan.outputSchema());
+
+ // But, no second batch.
+ assertFalse(reader.next());
+ reader.close();
scan.close();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java
index 3e3b833..334753d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java
@@ -23,7 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
@@ -37,7 +37,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanLifecycleSchema extends BaseTestScanLifecycle {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java
index 9815b94..e9c571d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java
@@ -21,7 +21,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
@@ -37,7 +37,7 @@
* Test two readers in succession in various cases: empty readers, normal readers,
* type conflicts, etc.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanLifecycleTwoReaders extends BaseTestScanLifecycle {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java
index 4dcc4ec..b7fa328 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java
@@ -24,7 +24,7 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -39,7 +39,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestDynamicSchemaFilter {
private static final ColumnMetadata A_COL =
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java
index 491be8b..57277b5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java
@@ -20,7 +20,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
@@ -37,7 +37,7 @@
* to see if the projection path is consistent with the type. Tests here
* verify the consistency checks.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestProjectedPath {
// INT is a proxy for all scalar columns.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java
index 02d05a1..d67d7e6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java
@@ -27,7 +27,7 @@
import java.util.Collections;
import java.util.List;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
@@ -46,7 +46,7 @@
* parsing; the only bits not tested here is that which is
* inherently specific to some use case.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestProjectionParser extends BaseTest {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java
index 6ccae71..075e774 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java
@@ -23,10 +23,11 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
@@ -43,7 +44,7 @@
* Test the scan operator schema tracker which computes the final
* output schema from a variety of sources.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanSchemaTracker extends BaseTest {
private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
@@ -294,6 +295,38 @@
assertEquals(expected, outputSchema);
}
+ /**
+ * Test for a query of the form:<br>
+ * {code SELECT * FROM t ORDER BY a}<br>
+ * in which we get a projection list of the form<br<
+ * {@code [`**`, `a`]<br>
+ * If we are given a provided schema of {@code (a, b, c)},
+ * the "natural" expansion will be @{code (b, c, a)}, but we
+ * add a hack to get what the user expects: @{code (a, b, c)}.
+ * The "natural" expansion occurs because the projection list says
+ * "all all columns to the wildcard position except those mentioned
+ * elsewhere". We essentially redefine it as "add or move all columns
+ * in provided schema order."
+ */
+ @Test
+ public void testWildcardWithExplicitWithProvided() {
+
+ // Simulate SELECT *, a ...
+ final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+ .projection(RowSetTestUtils.projectList(
+ SchemaPath.DYNAMIC_STAR, "a"));
+
+ final TupleMetadata providedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+ builder.providedSchema(providedSchema);
+ final ScanSchemaTracker schemaTracker = builder.build();
+ assertTrue(schemaTracker.isResolved());
+ assertEquals(providedSchema, schemaTracker.outputSchema());
+ }
+
@Test
public void testStrictProvidedSchemaWithWildcard() {
@@ -467,4 +500,57 @@
final TupleMetadata outputSchema = schemaTracker.outputSchema();
assertTrue(outputSchema.isEmpty());
}
+
+ /**
+ * Test for a query of the form:<br>
+ * {code SELECT * FROM t ORDER BY a}<br>
+ * in which we get a projection list of the form<br<
+ * {@code [`**`, `a`]<br>
+ * If we are given a reader schema of {@code (a, b, c)},
+ * the "natural" expansion will be @{code (b, c, a)}, but we
+ * add a hack to get what the user expects: @{code (a, b, c)}.
+ * The "natural" expansion occurs because the projection list says
+ * "all all columns to the wildcard position except those mentioned
+ * elsewhere". We essentially redefine it as "add or move all columns
+ * in provided schema order."
+ */
+ @Test
+ public void testWildcardWithExplicitWithReaderSchema() {
+
+ // Simulate SELECT *, a ...
+ final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+ .projection(RowSetTestUtils.projectList(
+ SchemaPath.DYNAMIC_STAR, "a"));
+
+ final TupleMetadata readerOutputSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+ final ScanSchemaTracker schemaTracker = builder.build();
+ schemaTracker.applyReaderSchema(readerOutputSchema, ERROR_CONTEXT);
+ assertTrue(schemaTracker.isResolved());
+ assertEquals(readerOutputSchema, schemaTracker.outputSchema());
+ }
+
+ @Test
+ public void testWildcardWithExplicitWithProvidedAndReaderSchema() {
+
+ // Simulate SELECT *, a ...
+ final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+ .projection(RowSetTestUtils.projectList(
+ SchemaPath.DYNAMIC_STAR, "a"));
+
+ final TupleMetadata providedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+ builder.providedSchema(providedSchema);
+ final ScanSchemaTracker schemaTracker = builder.build();
+ schemaTracker.applyReaderSchema(providedSchema, ERROR_CONTEXT);
+ assertTrue(schemaTracker.isResolved());
+ assertEquals(providedSchema, schemaTracker.outputSchema());
+ }
+
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java
index c7e4851..dda57c0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java
@@ -23,7 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -44,7 +44,7 @@
* defined schema, scan schema, reader schema and missing columns schemas
* must all be trees, and must all be kept in sync.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestScanSchemaTrackerMaps extends BaseTest {
private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java
index 76aba7f..fc343bd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java
@@ -23,7 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.expression.SchemaPath;
@@ -38,7 +38,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestSchemaTrackerDefined {
private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java
index af74205..5dab1ad 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java
@@ -22,13 +22,15 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ImplicitColumnOptions;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ImplicitColumnOptions;
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -41,7 +43,7 @@
* Basic tests of early reader against a project list are in
* {@link TestSchemaTrackerInputSchema}.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestSchemaTrackerEarlyReaderSchema extends SubOperatorTest {
protected static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
protected static final TupleMetadata SCHEMA = BaseTestSchemaTracker.SCHEMA;
@@ -169,4 +171,23 @@
assertTrue(e.getMessage().contains("Reader column: `a` INT"));
}
}
+
+ @Test
+ public void testWildcardWithExplicitWithEarlyReaderSchema() {
+
+ // Simulate SELECT *, a ...
+ final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+ .projection(RowSetTestUtils.projectList(
+ SchemaPath.DYNAMIC_STAR, "a"));
+
+ final TupleMetadata readerSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.VARCHAR)
+ .buildSchema();
+ final ScanSchemaTracker schemaTracker = builder.build();
+ schemaTracker.applyEarlyReaderSchema(readerSchema);
+ assertTrue(schemaTracker.isResolved());
+ assertEquals(readerSchema, schemaTracker.outputSchema());
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java
index 40e860f..2113d01 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java
@@ -27,7 +27,7 @@
import java.util.Collections;
import java.util.function.Consumer;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -47,7 +47,7 @@
* before any data is seen. There are subtle differences explored
* here and in {@link TestSchemaTrackerEarlyReaderSchema}.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestSchemaTrackerInputSchema extends BaseTestSchemaTracker {
private void testBoth(Collection<SchemaPath> projList, TupleMetadata schema,
@@ -175,25 +175,6 @@
});
}
- /**
- * Verify a reasonable error if the name of an implied implicit
- * column (one that appears with a wildcard) conflicts with a
- * provided column name. We cannot project two columns with that
- * same name. Does not occur for a reader schema.
- */
- @Test
- public void testWithWildcardImplicitConflict() {
- ProjectionSchemaTracker tracker = trackerFor(
- RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR, "y"));
- try {
- tracker.applyProvidedSchema(SCHEMA);
- fail();
- } catch (UserException e) {
- assertTrue(e.getMessage().contains("implicit"));
- assertTrue(e.getMessage().contains("Column: a"));
- }
- }
-
@Test
public void testProvidedMapProjectConflict() {
ProjectionSchemaTracker tracker = trackerFor(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java
index 9587e9c..ffb1854 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java
@@ -19,13 +19,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.Collections;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.expression.SchemaPath;
@@ -41,7 +42,7 @@
* Test the first step of scan schema resolution: translating from the
* projection parser to a dynamic schema ready for resolution.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestSchemaTrackerProjection extends BaseTest {
private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
@@ -55,7 +56,7 @@
ProjectionSchemaTracker tracker = schemaTracker(
Collections.emptyList());
assertTrue(tracker.isResolved());
- assertEquals(0, tracker.schemaVersion());
+ assertEquals(1, tracker.schemaVersion());
assertSame(ScanSchemaTracker.ProjectionType.NONE, tracker.projectionType());
assertTrue(tracker.internalSchema().toSchema().isEmpty());
ProjectionFilter filter = tracker.projectionFilter(ERROR_CONTEXT);
@@ -67,7 +68,7 @@
ProjectionSchemaTracker tracker = schemaTracker(
RowSetTestUtils.projectAll());
assertFalse(tracker.isResolved());
- assertEquals(0, tracker.schemaVersion());
+ assertEquals(1, tracker.schemaVersion());
assertSame(ScanSchemaTracker.ProjectionType.ALL, tracker.projectionType());
assertTrue(tracker.internalSchema().toSchema().isEmpty());
ProjectionFilter filter = tracker.projectionFilter(ERROR_CONTEXT);
@@ -81,6 +82,8 @@
assertFalse(tracker.isResolved());
assertTrue(0 < tracker.schemaVersion());
assertSame(ScanSchemaTracker.ProjectionType.ALL, tracker.projectionType());
+ assertNotNull(tracker.columnProjection("a"));
+ assertNotNull(tracker.columnProjection("b"));
TupleMetadata schema = tracker.internalSchema().toSchema();
assertEquals(2, schema.size());
assertTrue(schema.metadata(0).isDynamic());
@@ -100,4 +103,21 @@
ProjectionFilter filter = tracker.projectionFilter(ERROR_CONTEXT);
assertTrue(filter instanceof DynamicSchemaFilter);
}
+
+ @Test
+ public void testExplicitArray() {
+ ProjectionSchemaTracker tracker = schemaTracker(
+ RowSetTestUtils.projectList("a[1]", "a[3]"));
+ assertSame(ScanSchemaTracker.ProjectionType.SOME, tracker.projectionType());
+
+ ProjectedColumn projCol = tracker.columnProjection("a");
+ assertNotNull(projCol);
+ boolean[] indexes = projCol.indexes();
+ assertNotNull(indexes);
+ assertEquals(4, indexes.length);
+ assertFalse(indexes[0]);
+ assertTrue(indexes[1]);
+ assertFalse(indexes[2]);
+ assertTrue(indexes[3]);
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
index 1bee697..1c8ed95 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
@@ -30,7 +30,7 @@
import java.util.Arrays;
import java.util.List;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
@@ -61,7 +61,7 @@
/**
* Test of the basics of the projection mechanism.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestResultSetLoaderProjection extends SubOperatorTest {
/**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
index e4ce405..d4ae1ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
@@ -23,7 +23,7 @@
import java.nio.file.Paths;
import java.util.List;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.TestBuilder;
@@ -31,7 +31,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestCsvHeader extends BaseTestQuery{
private static final String ROOT = "store/text/data/cars.csvh";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
index 0b3c7d6..80c5ca4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
@@ -22,7 +22,7 @@
import java.io.File;
import java.io.IOException;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -33,7 +33,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestCsvIgnoreHeaders extends BaseCsvTest{
private static String withHeaders[] = {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
index 57ae662..9912ca4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.store.easy.text.compliant;
import org.apache.drill.TestSelectWithOption;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -53,7 +53,7 @@
* @see TestSelectWithOption for similar tests using table
* properties within SQL
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestCsvTableProperties extends BaseCsvTest {
@BeforeClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 03a98ac..a0bfbbf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -23,7 +23,7 @@
import java.io.PrintWriter;
import java.util.Iterator;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -60,7 +60,7 @@
*
* @see TestHeaderBuilder
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestCsvWithHeaders extends BaseCsvTest {
private static final String TEST_FILE_NAME = "basic.csv";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index 0458aee..ac27d2f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -24,7 +24,7 @@
import java.util.Iterator;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -47,7 +47,7 @@
* The focus here is on the schema mechanism, which can be explored
* with simple tables of just a few rows.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestCsvWithSchema extends BaseCsvTest {
protected static final String FILE1_NAME = "file1.csv";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index 872574c..c724a71 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -26,7 +26,7 @@
import java.io.IOException;
import java.util.Iterator;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.TestEmptyInputSql;
@@ -47,7 +47,7 @@
* and without an external schema. Data is represented with the
* `columns` array column.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestCsvWithoutHeaders extends BaseCsvTest {
private static final String TEST_FILE_NAME = "simple.csv";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
index 76c9d5d..ab4ee01 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
@@ -25,7 +25,7 @@
import java.io.IOException;
import java.util.Iterator;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -54,7 +54,7 @@
* current "V3" version. The tests here verify this behavior.
*/
-@Category(EvfTests.class)
+@Category(EvfTest.class)
public class TestPartitionRace extends BaseCsvTest {
@BeforeClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index e451dbb..0be54f3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -329,12 +329,11 @@
* Run the query and return the first non-empty batch as a
* {@link DirectRowSet} object that can be inspected directly
* by the code using a {@link RowSetReader}.
- * <p>
*
* @see #rowSetIterator() for a version that reads a series of
* batches as row sets.
* @return a row set that represents the first non-empty batch returned from
- * the query
+ * the query, or {@code null} if the query returns no data (no batches)
* @throws RpcException if anything goes wrong
*/
public DirectRowSet rowSet() throws RpcException {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index 9aee7eb..b21e22a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -241,6 +241,10 @@
return new PrimitiveColumnMetadata(field);
}
+ public static boolean isScalar(ColumnMetadata col) {
+ return isScalar(col.type());
+ }
+
public static boolean isScalar(MinorType type) {
return !isComplex(type);
}