DRILL-7725: Updates to the EVF2 framework

* Supports internal implicit columns
* Better support for standard conversions
* Handle several reader corner cases
* Simplified file reader

closes #2073
diff --git a/common/src/test/java/org/apache/drill/categories/EvfTests.java b/common/src/test/java/org/apache/drill/categories/EvfTest.java
similarity index 97%
rename from common/src/test/java/org/apache/drill/categories/EvfTests.java
rename to common/src/test/java/org/apache/drill/categories/EvfTest.java
index 7bc771e..e2fc1a7 100644
--- a/common/src/test/java/org/apache/drill/categories/EvfTests.java
+++ b/common/src/test/java/org/apache/drill/categories/EvfTest.java
@@ -21,6 +21,6 @@
  * A category for tests that test the "Extended Vector Framework"
  * (EVF): the mechanism that drives the plugin-based scan operator.
  */
-public interface EvfTests {
+public interface EvfTest {
   // Junit category marker
 }
diff --git a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java
index 6d45e06..6690df8 100644
--- a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java
+++ b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/writers/WriterSpec.java
@@ -44,7 +44,9 @@
     this.tupleWriter = tupleWriter;
     this.providedSchema = providedSchema;
     this.errorContext = errorContext;
-    this.conversions = new StandardConversions(providedSchema);
+    this.conversions = StandardConversions.builder()
+        .withSchema(providedSchema)
+        .build();
   }
 
   public ValueWriter makeWriter(String name, MinorType type, DataMode mode) {
@@ -56,7 +58,7 @@
       return tupleWriter.scalar(index);
     } else {
       int index = tupleWriter.addColumn(providedCol);
-      return conversions.converter(tupleWriter.scalar(index), type);
+      return conversions.converterFor(tupleWriter.scalar(index), type);
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index b6c77ca..699eb1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -263,7 +263,7 @@
       }
 
       // Another reader available?
-      if (! nextReader()) {
+      if (!nextReader()) {
         finalizeResults();
         return;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
index 17b3f68..842c798 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
@@ -110,5 +110,6 @@
   public static TupleMetadata columnsSchema() {
     return new SchemaBuilder()
       .addArray(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
-      .buildSchema();  }
+      .buildSchema();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java
index f31ee69..76d2616 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/AbstractConvertFromString.java
@@ -95,10 +95,10 @@
    * @param properties optional framework-specific properties
    * @return a function to call to prepare each string value for conversion
    */
-  private Function<String,String> buildPrepare(ColumnMetadata schema,
+  private Function<String, String> buildPrepare(ColumnMetadata schema,
       Map<String, String> properties) {
 
-    String blankProp = properties.get(ColumnMetadata.BLANK_AS_PROP);
+    String blankProp = properties == null ? null : properties.get(ColumnMetadata.BLANK_AS_PROP);
     if (blankProp != null) {
       switch (blankProp.toLowerCase()) {
         case ColumnMetadata.BLANK_AS_ZERO:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
index 7abaac5..396e741 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/convert/StandardConversions.java
@@ -24,21 +24,13 @@
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueWriter;
 
 /**
  * Factory for standard conversions as outlined in the package header.
- * Can be used in two ways:
- * <ul>
- * <li>As an object with a default set of properties that apply to
- * all columns.</li>
- * <li>As a set of static functions which operate on each column
- * individually.</li>
- * </ul>
- * The object form is more typical: it allows the provided schema to
- * set general blank-handling behavior at the tuple level.
+ * Use the builder to add schema-wide properties from a provided schema,
+ * from context-specific properties or both.
  * <p>
  * The class provides two kinds of information:
  * <p>
@@ -111,6 +103,42 @@
     }
   }
 
+  public static class Builder {
+
+    private final Map<String, String> properties = new HashMap<>();
+
+    public Builder withSchema(TupleMetadata providedSchema) {
+      if (providedSchema != null && providedSchema.hasProperties()) {
+        properties.putAll(providedSchema.properties());
+      }
+      return this;
+    }
+
+    public Builder withProperties(Map<String, String> properties) {
+      if (properties != null) {
+        this.properties.putAll(properties);
+      }
+      return this;
+    }
+
+    public Builder property(String key, String value) {
+      if (value == null) {
+        properties.remove(key);
+      } else {
+        properties.put(key, value);
+      }
+      return this;
+    }
+
+    public Builder blankAs(String value) {
+      return property(ColumnMetadata.BLANK_AS_PROP, value);
+    }
+
+    public StandardConversions build() {
+      return new StandardConversions(properties);
+    }
+  }
+
   public static final ConversionDefn IMPLICIT =
       new ConversionDefn(ConversionType.IMPLICIT);
   public static final ConversionDefn IMPLICIT_UNSAFE =
@@ -120,32 +148,14 @@
 
   private final Map<String, String> properties;
 
-  public StandardConversions() {
-    this.properties = null;
-  }
-
-  public StandardConversions(Map<String,String> properties) {
+  private StandardConversions(Map<String, String> properties) {
     this.properties = properties;
   }
 
-  public StandardConversions(String blankAsProp) {
-    if (blankAsProp == null) {
-      this.properties = null;
-    } else {
-      this.properties = new HashMap<>();
-      this.properties.put(ColumnMetadata.BLANK_AS_PROP, blankAsProp);
-    }
-  }
+  public static Builder builder() { return new Builder(); }
 
-  public StandardConversions(TupleMetadata providedSchema) {
-    if (providedSchema == null || !providedSchema.hasProperties()) {
-      this.properties = null;
-    } else {
-      this.properties = providedSchema.properties();
-    }
-  }
-
-  private Map<String,String> merge(Map<String,String> specificProps) {
+  private static Map<String, String> mergeProperties(Map<String, String> properties,
+      Map<String, String> specificProps) {
     if (properties == null) {
       return specificProps;
     } else if (specificProps == null) {
@@ -157,15 +167,19 @@
     return merged;
   }
 
-  public static DirectConverter newInstance(
+  private Map<String, String> mergeProperties(Map<String, String> specificProps) {
+    return mergeProperties(properties, specificProps);
+  }
+
+  public DirectConverter newInstance(
       Class<? extends DirectConverter> conversionClass, ScalarWriter baseWriter,
-      Map<String,String> properties) {
+      Map<String, String> properties) {
 
     // Try the Converter(ScalerWriter writer, Map<String, String> props) constructor first.
     // This first form is optional.
     try {
       final Constructor<? extends DirectConverter> ctor = conversionClass.getDeclaredConstructor(ScalarWriter.class, Map.class);
-      return ctor.newInstance(baseWriter, properties);
+      return ctor.newInstance(baseWriter, mergeProperties(properties));
     } catch (final ReflectiveOperationException e) {
       // Ignore
     }
@@ -174,7 +188,7 @@
     return newInstance(conversionClass, baseWriter);
   }
 
-  public static DirectConverter newInstance(
+  public DirectConverter newInstance(
       Class<? extends DirectConverter> conversionClass, ScalarWriter baseWriter) {
     try {
       final Constructor<? extends DirectConverter> ctor = conversionClass.getDeclaredConstructor(ScalarWriter.class);
@@ -196,11 +210,11 @@
    * @return a description of the conversion needed (if any), along with the
    * standard conversion class, if available
    */
-  public static ConversionDefn analyze(ColumnMetadata inputSchema, ColumnMetadata outputSchema) {
+  public ConversionDefn analyze(ColumnMetadata inputSchema, ColumnMetadata outputSchema) {
     return analyze(inputSchema.type(), outputSchema);
   }
 
-  public static ConversionDefn analyze(MinorType inputType, ColumnMetadata outputSchema) {
+  public ConversionDefn analyze(MinorType inputType, ColumnMetadata outputSchema) {
     if (inputType == outputSchema.type()) {
       return new ConversionDefn(ConversionType.NONE);
     }
@@ -364,8 +378,7 @@
     return EXPLICIT;
   }
 
-  public static Class<? extends DirectConverter> convertFromVarchar(
-      ColumnMetadata outputDefn) {
+  public Class<? extends DirectConverter> convertFromVarchar(ColumnMetadata outputDefn) {
     switch (outputDefn.type()) {
       case BIT:
         return ConvertStringToBoolean.class;
@@ -404,75 +417,33 @@
    *
    * @param scalarWriter the output column writer
    * @param inputType the type of the input data
-   * @param properties optional properties for some string-based conversions
+   * @param columnProps optional properties for some string-based conversions
    * @return a column converter, if needed and available, the input writer if
    * no conversion is needed, or null if there is no conversion available
    */
-  public static ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType, Map<String, String> properties) {
+  public ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType, Map<String, String> columnProps) {
     ConversionDefn defn = analyze(inputType, scalarWriter.schema());
     switch (defn.type) {
       case EXPLICIT:
         if (defn.conversionClass != null) {
-          return newInstance(defn.conversionClass, scalarWriter, properties);
+          return newInstance(defn.conversionClass, scalarWriter, columnProps);
         } else {
           return null;
         }
       case IMPLICIT:
       case IMPLICIT_UNSAFE:
-        return newInstance(defn.conversionClass, scalarWriter, properties);
+        return newInstance(defn.conversionClass, scalarWriter, columnProps);
       default:
         return scalarWriter;
     }
   }
 
-  public static ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType) {
+  public ValueWriter converterFor(ScalarWriter scalarWriter, MinorType inputType) {
     return converterFor(scalarWriter, inputType, null);
   }
 
-  public static ValueWriter converterFor(ScalarWriter scalarWriter, ColumnMetadata inputSchema) {
-    return converterFor(scalarWriter, inputSchema.type(), inputSchema.properties());
-  }
-
-  public ValueWriter converter(ScalarWriter scalarWriter, ColumnMetadata inputSchema) {
+  public ValueWriter converterFor(ScalarWriter scalarWriter, ColumnMetadata inputSchema) {
     return converterFor(scalarWriter, inputSchema.type(),
-        merge(inputSchema.properties()));
-  }
-
-  public ValueWriter converter(ScalarWriter scalarWriter, MinorType inputType) {
-    return converterFor(scalarWriter, inputType, properties);
-  }
-
-  /**
-   * Given a desired provided schema and an actual reader schema, create a merged
-   * schema that contains the provided column where available, but the reader
-   * column otherwise. Copies provided properties to the output schema.
-   * <p>
-   * The result is the schema to use when creating column writers: it reflects
-   * the type of the target vector. The reader is responsible for converting from
-   * the (possibly different) reader column type to the provided column type.
-   * <p>
-   * Note: the provided schema should only contain types that the reader is prepared
-   * to offer: there is no requirement that the reader support every possible conversion,
-   * only those that make sense for that one reader.
-   *
-   * @param providedSchema the provided schema from {@code CREATE SCHEMA}
-   * @param readerSchema the set of column types that the reader can provide
-   * "natively"
-   * @return a merged schema to use when creating the {@code ResultSetLoader}
-   */
-  public static TupleMetadata mergeSchemas(TupleMetadata providedSchema,
-      TupleMetadata readerSchema) {
-    if (providedSchema == null) {
-      return readerSchema;
-    }
-    final TupleMetadata tableSchema = new TupleSchema();
-    for (ColumnMetadata readerCol : readerSchema) {
-      final ColumnMetadata providedCol = providedSchema.metadata(readerCol.name());
-      tableSchema.addColumn(providedCol == null ? readerCol : providedCol);
-    }
-    if (providedSchema.hasProperties()) {
-      tableSchema.properties().putAll(providedSchema.properties());
-    }
-    return tableSchema;
+        inputSchema.hasProperties() ? inputSchema.properties() : null);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/FixedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/FixedReceiver.java
new file mode 100644
index 0000000..923214a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/FixedReceiver.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleNameSpace;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Layer above the {@code ResultSetLoader} which handles standard conversions
+ * for scalar columns where the schema is known up front (i.e. "early schema".)
+ * Columns access is by both name and position, though access by position is
+ * faster and is preferred where possible for performance.
+ */
+public class FixedReceiver {
+  private static final Logger logger = LoggerFactory.getLogger(FixedReceiver.class);
+
+  public static class Builder {
+    private final SchemaNegotiator negotiator;
+    private final TupleMetadata providedSchema;
+    private final StandardConversions.Builder conversionBuilder = StandardConversions.builder();
+    private boolean isComplete;
+    private RowSetLoader rowWriter;
+
+    public Builder(SchemaNegotiator negotiator) {
+      this.negotiator = negotiator;
+      this.providedSchema = negotiator.providedSchema();
+    }
+
+    /**
+     * Provides access to the conversion builder to add custom properties.
+      */
+    public StandardConversions.Builder conversionBuilder() {
+      return conversionBuilder;
+    }
+
+    /**
+     * Mark that the reader schema provided to {@link #build(TupleMetadata)}
+     * contains all columns that this reader will deliver. Allows some
+     * optimizations. See {@link SchemaNegotiator#schemaIsComplete(boolean)}.
+     */
+    public Builder schemaIsComplete() {
+      isComplete = true;
+      return this;
+    }
+
+    /**
+     * Create a fixed receiver for the provided schema (if any) in the
+     * scan plan, and the given reader schema. Assumes no new columns will
+     * be added later in the read.
+     */
+    public FixedReceiver build(TupleMetadata readerSchema) {
+      StandardConversions conversions = conversionBuilder.build();
+      TupleMetadata writerSchema = mergeSchemas(negotiator.providedSchema(), readerSchema);
+      negotiator.tableSchema(writerSchema);
+      negotiator.schemaIsComplete(isComplete);
+      ResultSetLoader loader = negotiator.build();
+      rowWriter = loader.writer();
+      TupleNameSpace<ValueWriter> writers = new TupleNameSpace<>();
+      for (ColumnMetadata col : readerSchema) {
+        writers.add(col.name(), writerFor(col, conversions));
+      }
+      return new FixedReceiver(rowWriter, writers);
+    }
+
+    /**
+     * Given a desired provided schema and an actual reader schema, create a merged
+     * schema that contains the provided column where available, but the reader
+     * column otherwise. Copies provided properties to the output schema.
+     * <p>
+     * The result is the schema to use when creating column writers: it reflects
+     * the type of the target vector. The reader is responsible for converting from
+     * the (possibly different) reader column type to the provided column type.
+     * <p>
+     * Note: the provided schema should only contain types that the reader is prepared
+     * to offer: there is no requirement that the reader support every possible conversion,
+     * only those that make sense for that one reader.
+     *
+     * @param providedSchema the provided schema from {@code CREATE SCHEMA}
+     * @param readerSchema the set of column types that the reader can provide
+     * "natively"
+     * @return a merged schema to use when creating the {@code ResultSetLoader}
+     */
+    public static TupleMetadata mergeSchemas(TupleMetadata providedSchema,
+        TupleMetadata readerSchema) {
+      if (providedSchema == null) {
+        return readerSchema;
+      }
+      final TupleMetadata tableSchema = new TupleSchema();
+      for (ColumnMetadata readerCol : readerSchema) {
+        final ColumnMetadata providedCol = providedSchema.metadata(readerCol.name());
+        tableSchema.addColumn(providedCol == null ? readerCol : providedCol);
+      }
+      if (providedSchema.hasProperties()) {
+        tableSchema.properties().putAll(providedSchema.properties());
+      }
+      return tableSchema;
+    }
+
+    private ValueWriter writerFor(ColumnMetadata readerCol, StandardConversions conversions) {
+      if (!MetadataUtils.isScalar(readerCol)) {
+        throw UserException.internalError()
+            .message("FixedReceiver only works with scalar columns, reader column is not scalar")
+            .addContext("Column name", readerCol.name())
+            .addContext("Column type", readerCol.type().name())
+            .addContext(errorContext())
+            .build(logger);
+      }
+      ScalarWriter baseWriter = rowWriter.scalar(readerCol.name());
+      if (!rowWriter.isProjected(readerCol.name())) {
+        return baseWriter;
+      }
+      ColumnMetadata providedCol = providedCol(readerCol.name());
+      if (providedCol == null) {
+        return baseWriter;
+      }
+      if (!MetadataUtils.isScalar(providedCol)) {
+        throw UserException.validationError()
+          .message("FixedReceiver only works with scalar columns, provided column is not scalar")
+          .addContext("Provided column name", providedCol.name())
+          .addContext("Provided column type", providedCol.type().name())
+          .addContext(errorContext())
+          .build(logger);
+      }
+      if (!compatibleModes(readerCol.mode(), providedCol.mode())) {
+        throw UserException.validationError()
+          .message("Reader and provided columns have incompatible cardinality")
+          .addContext("Column name", providedCol.name())
+          .addContext("Provided column mode", providedCol.mode().name())
+          .addContext("Reader column mode", readerCol.mode().name())
+          .addContext(errorContext())
+          .build(logger);
+      }
+      return conversions.converterFor(baseWriter, readerCol.type());
+    }
+
+    private boolean compatibleModes(DataMode source, DataMode dest) {
+      return source == dest ||
+             dest == DataMode.OPTIONAL && source == DataMode.REQUIRED;
+    }
+
+    private ColumnMetadata providedCol(String name) {
+      return providedSchema == null ? null : providedSchema.metadata(name);
+    }
+
+    private CustomErrorContext errorContext( ) {
+      return negotiator.errorContext();
+    }
+  }
+
+  private final RowSetLoader rowWriter;
+  private final TupleNameSpace<ValueWriter> writers;
+
+  private FixedReceiver(RowSetLoader rowWriter,
+      TupleNameSpace<ValueWriter> writers) {
+    this.rowWriter = rowWriter;
+    this.writers = writers;
+  }
+
+  public static Builder builderFor(SchemaNegotiator negotiator) {
+    return new Builder(negotiator);
+  }
+
+  public boolean start() {
+    return rowWriter.start();
+  }
+
+  public ValueWriter scalar(int index) {
+    return writers.get(index);
+  }
+
+  public ValueWriter scalar(String name) {
+    return writers.get(name);
+  }
+
+  public void save() {
+    rowWriter.save();
+  }
+
+  public RowSetLoader rowWriter() { return rowWriter; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
index f4393f1..393fc23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
@@ -24,12 +24,17 @@
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.protocol.OperatorDriver;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanEventListener;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanLifecycle;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
@@ -80,6 +85,11 @@
     }
   }
 
+  public interface SchemaValidator {
+    void validate(ScanSchemaTracker schema);
+  }
+
+  private OptionSet options;
   private ReaderFactory<?> readerFactory;
   protected String userName;
   protected MajorType nullType;
@@ -133,10 +143,24 @@
   protected boolean allowSchemaChange = true;
 
   /**
+   * Optional schema validator to perform per-scan checks of the
+   * projection or resolved schema.
+   */
+  protected SchemaValidator schemaValidator;
+
+  /**
    * Context for error messages.
    */
   protected CustomErrorContext errorContext;
 
+  public void options(OptionSet options) {
+    this.options = options;
+  }
+
+  public OptionSet options() {
+    return options;
+  }
+
   public void readerFactory(ReaderFactory<?> readerFactory) {
     this.readerFactory = readerFactory;
   }
@@ -258,6 +282,12 @@
     return readerFactory;
   }
 
+  public void schemaValidator(SchemaValidator schemaValidator) {
+    this.schemaValidator = schemaValidator;
+  }
+
+  public SchemaValidator schemaValidator() { return schemaValidator; }
+
   public ScanLifecycle build(OperatorContext context) {
     return new ScanLifecycle(context, this);
   }
@@ -266,6 +296,10 @@
   public ScanOperatorExec buildScan() {
     return new ScanOperatorExec(
         new ScanEventListener(this),
-        !disableEmptyResults);
+             !disableEmptyResults);
+  }
+
+  public OperatorRecordBatch buildScanOperator(FragmentContext fragContext, PhysicalOperator pop) {
+    return new OperatorRecordBatch(fragContext, pop, buildScan(), enableSchemaBatch);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java
index 01d9b4c..9dee1d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java
@@ -19,6 +19,7 @@
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -116,6 +117,13 @@
   void setErrorContext(CustomErrorContext context);
 
   /**
+   * Returns the error context to use for this reader: either the
+   * parent or the reader-specific context set in
+   * {@link #setErrorContext(CustomErrorContext)}.
+   */
+  CustomErrorContext errorContext();
+
+  /**
    * Name of the user running the query.
    */
   String userName();
@@ -134,6 +142,8 @@
    */
   boolean isProjectionEmpty();
 
+  ProjectedColumn projectionFor(String colName);
+
   /**
    * Returns the provided schema, if defined. The provided schema is a
    * description of the source schema viewed as a Drill schema.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java
index bb92772..c6fff11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileDescrip.java
@@ -17,10 +17,18 @@
  */
 package org.apache.drill.exec.physical.impl.scan.v3.file;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
 
 /**
+ * Describes one file within a scan and is used to populate implicit columns.
  * Specify the file name and optional selection root. If the selection root
  * is provided, then partitions are defined as the portion of the file name
  * that is not also part of the selection root. That is, if selection root is
@@ -28,13 +36,35 @@
  */
 public class FileDescrip {
 
-  private final Path filePath;
+  private final DrillFileSystem dfs;
+  private final FileWork fileWork;
+  private final FileSplit split;
   private final String[] dirPath;
 
-  public FileDescrip(Path filePath, Path selectionRoot) {
-    this.filePath = filePath;
+  // Option to open the file as optionally compressed
+  private boolean isCompressible;
+
+  // Parquet-related attributes
+  protected Integer rowGroupIndex;
+  protected Long rowGroupStart;
+  protected Long rowGroupLength;
+
+  // Cached modification time. Cached as a string because
+  // that's the odd way we return the value.
+  private String modTime;
+
+  // Flag to indicate that the file turned out to be empty.
+  // Used to set one of the internal implicit columns.
+  protected boolean isEmpty;
+
+  public FileDescrip(DrillFileSystem dfs, FileWork fileWork, Path selectionRoot) {
+    this.dfs = dfs;
+    this.fileWork = fileWork;
+    Path path = dfs.makeQualified(fileWork.getPath());
+    this.split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
 
     // If the data source is not a file, no file metadata is available.
+    Path filePath = fileWork.getPath();
     if (selectionRoot == null || filePath == null) {
       dirPath = null;
       return;
@@ -57,7 +87,25 @@
     }
   }
 
-  public Path filePath() { return filePath; }
+  /**
+   * Gives the Drill file system for this operator.
+   */
+  public DrillFileSystem fileSystem() { return dfs; }
+
+  /**
+   * Returns Drill's version of the Hadoop file split.
+   */
+  public Path filePath() { return fileWork.getPath(); }
+
+  /**
+   * Describes the file split (path and block offset) for this scan.
+   *
+   * @return Hadoop file split object with the file path, block
+   * offset, and length.
+   */
+  public FileSplit split() { return split; }
+
+  public FileWork fileWork() { return fileWork; }
 
   public String partition(int index) {
     if (dirPath == null ||  dirPath.length <= index) {
@@ -70,5 +118,49 @@
     return dirPath == null ? 0 : dirPath.length;
   }
 
-  public boolean isSet() { return filePath != null; }
+  public void setRowGroupAttribs(int index, long start, long length) {
+    this.rowGroupIndex = index;
+    this.rowGroupStart = start;
+    this.rowGroupLength = length;
+  }
+
+  public String getModTime() {
+    if (modTime == null) {
+      try {
+        modTime = String.valueOf(dfs.getFileStatus(filePath()).getModificationTime());
+      } catch (IOException e) {
+
+        // This is an odd place to catch and report errors. Assume that, if the file
+        // has problems, the call to open the file will fail and will return a better
+        // error message than we can provide here.
+      }
+    }
+    return modTime;
+  }
+
+  /**
+   * Explicitly set the cached modification time. For testing only.
+   */
+  @VisibleForTesting
+  public void setModTime(String modTime) {
+    this.modTime = modTime;
+  }
+
+  public void setCompressible(boolean isCompressed) {
+    this.isCompressible = isCompressed;
+  }
+
+  public boolean isCompressible() { return isCompressible; }
+
+  public InputStream open() throws IOException {
+    if (isCompressible) {
+      return dfs.openPossiblyCompressedStream(filePath());
+    } else {
+      return dfs.open(filePath());
+    }
+  }
+
+  public void markEmpty() {
+    isEmpty = true;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java
index eb4fc07..0db2cdc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycle.java
@@ -77,9 +77,10 @@
 
     // Create the implicit columns manager
     this.implicitColumnsHandler = new ImplicitFileColumnsHandler(
-        context, options, vectorCache(), schemaTracker());
+        dfs, context.getFragmentContext().getOptions(),
+        options, vectorCache(), schemaTracker());
 
-    // Bind the reader factory which intializes the list
+    // Bind the reader factory which initializes the list
     // of splits from the builder.
     FileReaderFactory readerFactory = (FileReaderFactory) readerFactory();
     readerFactory.bind(this);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java
index a791a35..85f9dbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileScanLifecycleBuilder.java
@@ -17,22 +17,25 @@
  */
 package org.apache.drill.exec.physical.impl.scan.v3.file;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ScanLifecycle;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 public class FileScanLifecycleBuilder extends ScanLifecycleBuilder {
   protected int maxPartitionDepth;
-  protected boolean useLegacyWildcardExpansion;
+  protected boolean useLegacyWildcardExpansion = true;
   protected Path rootDir;
   private List<FileWork> splits;
   private Configuration fsConf;
+  private boolean compressible;
 
   public void fileSystemConfig(Configuration fsConf) {
     this.fsConf = fsConf;
@@ -42,6 +45,14 @@
     this.splits = splits;
   }
 
+  /**
+   * Legacy version because the file scan operator exposes the
+   * implementation, not the interface.
+   */
+  public void fileSplitImpls(List<FileWorkImpl> splits) {
+    this.splits = new ArrayList<>(splits);
+  }
+
   public void maxPartitionDepth(int maxPartitionDepth) {
     this.maxPartitionDepth = maxPartitionDepth;
   }
@@ -54,6 +65,10 @@
     this.rootDir = rootDir;
   }
 
+  public void compressible(boolean compressible) {
+    this.compressible = compressible;
+  }
+
   public List<FileWork> splits() {
     return Preconditions.checkNotNull(splits);
   }
@@ -65,20 +80,16 @@
     return fsConf;
   }
 
+  public int maxPartitionDepth() { return maxPartitionDepth; }
+
+  public boolean useLegacyWildcardExpansion() { return useLegacyWildcardExpansion; }
+
+  public Path rootDir() { return rootDir; }
+
+  public boolean isCompressible() { return compressible; }
+
   @Override
   public ScanLifecycle build(OperatorContext context) {
     return new FileScanLifecycle(context, this);
   }
-
-  public int maxPartitionDepth() {
-    return maxPartitionDepth;
-  }
-
-  public boolean useLegacyWildcardExpansion() {
-    return useLegacyWildcardExpansion;
-  }
-
-  public Path rootDir() {
-    return rootDir;
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java
index 156b5e4..86dd2b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiator.java
@@ -18,9 +18,6 @@
 package org.apache.drill.exec.physical.impl.scan.v3.file;
 
 import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.hadoop.mapred.FileSplit;
 
 /**
  * The file schema negotiator provides access to the Drill file system
@@ -29,20 +26,9 @@
 public interface FileSchemaNegotiator extends SchemaNegotiator {
 
   /**
-   * Gives the Drill file system for this operator.
+   * Gives the file description which holds the Drill file system,
+   * split, file work and format-specific properties. Can open the
+   * file and provides information used to populate implicit columns.
    */
-  DrillFileSystem fileSystem();
-
-  /**
-   * Describes the file split (path and block offset) for this scan.
-   *
-   * @return Hadoop file split object with the file path, block
-   * offset, and length.
-   */
-  FileSplit split();
-
-  /**
-   * Returns Drill's version of the Hadoop file split.
-   */
-  FileWork fileWork();
+  FileDescrip file();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java
index 8457b2f..9d69a97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/FileSchemaNegotiatorImpl.java
@@ -21,14 +21,13 @@
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException.Builder;
 import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
-import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ReaderLifecycle;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.SchemaNegotiatorImpl;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
 /**
@@ -49,7 +48,8 @@
     @Override
     public void addContext(Builder builder) {
       super.addContext(builder);
-      builder.addContext("File", Path.getPathWithoutSchemeAndAuthority(split.getPath()).toString());
+      FileSplit split = fileDescrip.split();
+      builder.addContext("File", split.getPath().toString());
       if (split.getStart() != 0) {
         builder.addContext("Offset", split.getStart());
         builder.addContext("Length", split.getLength());
@@ -57,8 +57,7 @@
     }
   }
 
-  private FileWork fileWork;
-  private FileSplit split;
+  private FileDescrip fileDescrip;
 
   public FileSchemaNegotiatorImpl(ReaderLifecycle readerLifecycle) {
     super(readerLifecycle);
@@ -67,19 +66,11 @@
   }
 
   public void bindSplit(FileWork fileWork) {
-    this.fileWork = fileWork;
-    Path path = fileScan().fileSystem().makeQualified(fileWork.getPath());
-    split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
+    fileDescrip = fileScan().implicitColumnsHandler().makeDescrip(fileWork);
   }
 
   @Override
-  public DrillFileSystem fileSystem() { return fileScan().fileSystem(); }
-
-  @Override
-  public FileSplit split() { return split; }
-
-  @Override
-  public FileWork fileWork() { return fileWork; }
+  public FileDescrip file() { return fileDescrip; }
 
   @Override
   @SuppressWarnings("unchecked")
@@ -89,10 +80,32 @@
 
   @Override
   public StaticBatchBuilder implicitColumnsLoader() {
-    return fileScan().implicitColumnsHandler().forFile(split.getPath());
+    return fileScan().implicitColumnsHandler().forFile(fileDescrip);
   }
 
   private FileScanLifecycle fileScan() {
     return (FileScanLifecycle) readerLifecycle.scanLifecycle();
   }
+
+  @Override
+  protected void onEndBatch() {
+
+    // If this is is a metadata scan, and this file has no rows (this is
+    // the first batch and contains no data), then add a dummy row so
+    // we have something to aggregate upon.
+    ImplicitFileColumnsHandler handler = fileScan().implicitColumnsHandler();
+    if (!handler.isMetadataScan()) {
+      return;
+    }
+    ResultSetLoader tableLoader = readerLifecycle.tableLoader();
+    if (tableLoader.batchCount() == 0 && !tableLoader.hasRows()) {
+
+      // This is admittedly a hack. The table may contain non-nullable
+      // columns, but we are asking for null values for those columns.
+      // We'll fill in defaults, with is not ideal.
+      tableLoader.writer().start();
+      tableLoader.writer().save();
+      fileDescrip.markEmpty();
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnMarker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnMarker.java
new file mode 100644
index 0000000..e8c9790
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnMarker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3.file;
+
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitInternalFileColumns;
+
+/**
+ * Marks a column as implicit and provides a function to resolve an
+ * implicit column given a description of the input file.
+ */
+public interface ImplicitColumnMarker {
+  String resolve(FileDescrip fileInfo);
+
+  /**
+   * Marker for a file-based, non-internal implicit column that
+   * extracts parts of the file name as defined by the implicit
+   * column definition.
+   */
+  public class FileImplicitMarker implements ImplicitColumnMarker {
+    public final ImplicitFileColumns defn;
+
+    public FileImplicitMarker(ImplicitFileColumns defn) {
+      this.defn = defn;
+    }
+
+    @Override
+    public String resolve(FileDescrip fileInfo) {
+      return defn.getValue(fileInfo.filePath());
+    }
+  }
+
+  /**
+   * Partition column defined by a partition depth from the scan
+   * root folder. Partitions that reference non-existent directory levels
+   * are null.
+   */
+  public class PartitionColumnMarker implements ImplicitColumnMarker {
+    private final int partition;
+
+    public PartitionColumnMarker(int partition) {
+      this.partition = partition;
+    }
+
+    @Override
+    public String resolve(FileDescrip fileInfo) {
+      return fileInfo.partition(partition);
+    }
+  }
+
+  public class InternalColumnMarker implements ImplicitColumnMarker {
+    public final ImplicitInternalFileColumns defn;
+
+    public InternalColumnMarker(ImplicitInternalFileColumns defn) {
+      this.defn = defn;
+    }
+
+    @Override
+    public String resolve(FileDescrip fileInfo) {
+      switch (defn) {
+      case ROW_GROUP_INDEX:
+        return valueOf(fileInfo.rowGroupIndex);
+      case ROW_GROUP_START:
+        return valueOf(fileInfo.rowGroupStart);
+      case ROW_GROUP_LENGTH:
+        return valueOf(fileInfo.rowGroupLength);
+      case PROJECT_METADATA:
+      case USE_METADATA:
+
+        // Per Metadata code: if file is empty (and record is a placeholder)
+        // return "false", else return null for valid rows.
+        return fileInfo.isEmpty ? Boolean.FALSE.toString() : null;
+      case LAST_MODIFIED_TIME:
+        return fileInfo.getModTime();
+      default:
+        throw new IllegalStateException(defn.name());
+      }
+    }
+
+    private String valueOf(Object value) {
+      return value == null ? null : String.valueOf(value);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ImplicitColumnResolver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnResolver.java
similarity index 71%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ImplicitColumnResolver.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnResolver.java
index fcc3ed8..3ec7f35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ImplicitColumnResolver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitColumnResolver.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.scan.v3.schema;
+package org.apache.drill.exec.physical.impl.scan.v3.file;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -33,13 +33,22 @@
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.FileImplicitMarker;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.InternalColumnMarker;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker.PartitionColumnMarker;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema.ColumnHandle;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.SchemaUtils;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumn;
 import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitInternalFileColumns;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,6 +92,8 @@
      */
     protected boolean useLegacyWildcardExpansion = true;
 
+    protected DrillFileSystem dfs;
+
     public ImplicitColumnOptions optionSet(OptionSet optionSet) {
       this.optionSet = optionSet;
       return this;
@@ -111,47 +122,10 @@
       useLegacyWildcardExpansion = flag;
       return this;
     }
-  }
 
-  /**
-   * Provides a function to resolve an implicit column given a description
-   * of the input file.
-   */
-  public interface ColumnMarker {
-    String resolve(FileDescrip fileInfo);
-  }
-
-  /**
-   * Implicit column defined by an {@link ImplicitFileColumns}.
-   */
-  public static class ImplicitColumnMarker implements ColumnMarker {
-    private final ImplicitFileColumns defn;
-
-    public ImplicitColumnMarker(ImplicitFileColumns defn) {
-      this.defn = defn;
-    }
-
-    @Override
-    public String resolve(FileDescrip fileInfo) {
-      return defn.getValue(fileInfo.filePath());
-    }
-  }
-
-  /**
-   * Partition column defined by a partition depth from the scan
-   * root folder. Partitions that reference non-existent directory levels
-   * are null.
-   */
-  public static class PartitionColumnMarker implements ColumnMarker {
-    private final int partition;
-
-    private PartitionColumnMarker(int partition) {
-      this.partition = partition;
-    }
-
-    @Override
-    public String resolve(FileDescrip fileInfo) {
-      return fileInfo.partition(partition);
+    public ImplicitColumnOptions dfs(DrillFileSystem dfs) {
+      this.dfs = dfs;
+      return this;
     }
   }
 
@@ -161,16 +135,20 @@
    * column markers which resolve the columns for each file.
    */
   public static class ParseResult {
-    private final List<ColumnMarker> columns;
+    private final List<ImplicitColumnMarker> columns;
     private final TupleMetadata schema;
+    private final boolean isMetadataScan;
 
-    protected ParseResult(List<ColumnMarker> columns, TupleMetadata schema) {
+    protected ParseResult(List<ImplicitColumnMarker> columns, TupleMetadata schema,
+        boolean isMetadataScan) {
       this.columns = columns;
       this.schema = schema;
+      this.isMetadataScan = isMetadataScan;
     }
 
     public TupleMetadata schema() { return schema; }
-    public List<ColumnMarker> columns() { return columns; }
+    public List<ImplicitColumnMarker> columns() { return columns; }
+    public boolean isMetadataScan() { return isMetadataScan; }
 
     public Object[] resolve(FileDescrip fileInfo) {
       Object values[] = new Object[columns.size()];
@@ -185,8 +163,9 @@
     private final ImplicitColumnResolver parser;
     private final ScanSchemaTracker tracker;
     private final MutableTupleSchema scanSchema;
-    private final List<ColumnMarker> columns = new ArrayList<>();
+    private final List<ImplicitColumnMarker> columns = new ArrayList<>();
     private final Set<Integer> referencedPartitions = new HashSet<>();
+    private boolean isMetadataScan;
 
     protected ImplicitColumnParser(ImplicitColumnResolver parser, ScanSchemaTracker tracker) {
       this.parser = parser;
@@ -196,7 +175,7 @@
 
     protected ParseResult parse() {
       for (ColumnHandle col : tracker.internalSchema().columns()) {
-        matchColumn(parser, col);
+        matchColumn(col);
       }
       if (tracker.internalSchema().projectionType() == ScanSchemaTracker.ProjectionType.ALL) {
         expandWildcard();
@@ -207,7 +186,7 @@
       // appears out-of-order:
       // SELECT *, fileName
       // SELECT fileName, *
-      return new ParseResult(columns, tracker.applyImplicitCols());
+      return new ParseResult(columns, tracker.applyImplicitCols(), isMetadataScan);
     }
 
     private void expandWildcard() {
@@ -224,21 +203,21 @@
         if (referencedPartitions.contains(i)) {
           continue;
         }
+        ImplicitColumnMarker marker = new PartitionColumnMarker(i);
         ColumnMetadata resolved = MetadataUtils.newScalar(parser.partitionName(i), PARTITION_COL_TYPE);
-        SchemaUtils.markAsPartition(resolved, i);
-        columns.add(new PartitionColumnMarker(i));
-        tracker.expandImplicitCol(resolved);
+        columns.add(marker);
+        tracker.expandImplicitCol(resolved, marker);
         referencedPartitions.add(i);
       }
     }
 
-    private void matchColumn(ImplicitColumnResolver parser, ColumnHandle col) {
+    private void matchColumn(ColumnHandle col) {
       String colType = SchemaUtils.implicitColType(col.column());
       if (colType != null) {
         resolveTaggedColumn(parser, col, colType);
         return;
       } else if (col.column().isDynamic()) {
-        matchByName(parser, col);
+        matchByName(col);
       }
     }
 
@@ -250,22 +229,23 @@
         return;
       }
 
-      ImplicitFileColumns defn = parser.typeDefs.get(colType);
+      ImplicitFileColumn defn = parser.typeDefs.get(colType);
       if (defn != null) {
-        resolveImplicitColumn(defn, parser, col);
+        resolveImplicitColumn((ImplicitFileColumns) defn, col, colType);
         return;
       }
-      resolveUnknownColumn(parser, col, colType);
+      resolveUnknownColumn(col, colType);
     }
 
-    private void resolvePartitionColumn(Matcher m, ImplicitColumnResolver parser, ColumnHandle col) {
+    private void resolvePartitionColumn(Matcher m, ImplicitColumnResolver parser,
+        ColumnHandle col) {
 
       // The provided schema column must be of the correct type and mode.
       ColumnMetadata colSchema = col.column();
       if (colSchema.type() != MinorType.VARCHAR ||
           colSchema.mode() != DataMode.OPTIONAL) {
         throw UserException.validationError()
-            .message("Provided column %s is marked as a parition column, but is of the wrong type",
+            .message("Provided column `%s` is marked as a parition column, but is of the wrong type",
                 colSchema.columnString())
             .addContext("Expected type", MinorType.VARCHAR.name())
             .addContext("Expected cardinality", DataMode.OPTIONAL.name())
@@ -275,22 +255,21 @@
 
       // Partition column
       int partitionIndex = Integer.parseInt(m.group(1));
-      columns.add(new PartitionColumnMarker(partitionIndex));
-      col.markImplicit();
+      markImplicit(col, new PartitionColumnMarker(partitionIndex));
 
       // Remember the partition for later wildcard expansion
       referencedPartitions.add(partitionIndex);
     }
 
     private void resolveImplicitColumn(ImplicitFileColumns defn,
-        ImplicitColumnResolver parser, ColumnHandle col) {
+        ColumnHandle col, String colType) {
 
       // The provided schema column must be of the correct type and mode.
       ColumnMetadata colSchema = col.column();
       if (colSchema.type() != MinorType.VARCHAR ||
           colSchema.mode() == DataMode.REPEATED) {
         throw UserException.validationError()
-            .message("Provided column %s is marked as an implicit column '%s', but is of the wrong type",
+            .message("Provided column `%s` is marked as implicit '%s', but is of the wrong type",
                 colSchema.columnString(), defn.propertyValue())
             .addContext("Expected type", MinorType.VARCHAR.name())
             .addContext("Expected cardinality", String.format("%s or %s",
@@ -298,12 +277,15 @@
             .addContext(parser.errorContext)
             .build(logger);
       }
-      columns.add(new ImplicitColumnMarker(defn));
-      col.markImplicit();
+      markImplicit(col, new FileImplicitMarker(defn));
     }
 
-    private void resolveUnknownColumn(ImplicitColumnResolver parser,
-        ColumnHandle col, String colType) {
+    private void markImplicit(ColumnHandle col, ImplicitColumnMarker marker) {
+      columns.add(marker);
+      col.markImplicit(marker);
+    }
+
+    private void resolveUnknownColumn(ColumnHandle col, String colType) {
       throw UserException.validationError()
           .message("Provided column %s references an undefined implicit column type '%s'",
               col.column().columnString(), colType)
@@ -314,20 +296,20 @@
           .build(logger);
     }
 
-    private void matchByName(ImplicitColumnResolver parser, ColumnHandle col) {
+    private void matchByName(ColumnHandle col) {
       Matcher m = parser.partitionPattern.matcher(col.name());
       if (m.matches()) {
-        buildPartitionColumn(m, parser, col);
+        buildPartitionColumn(m, col);
         return;
       }
 
-      ImplicitFileColumns defn = parser.colDefs.get(col.name());
+      ImplicitFileColumn defn = parser.colDefs.get(col.name());
       if (defn != null) {
-        buildImplicitColumn(defn, parser, col);
+        buildImplicitColumn(defn, col);
       }
     }
 
-    private void buildPartitionColumn(Matcher m, ImplicitColumnResolver parser, ColumnHandle col) {
+    private void buildPartitionColumn(Matcher m, ColumnHandle col) {
 
       // If the projected column is a map or array, then it shadows the
       // partition column. Example: dir0.x, dir0[2].
@@ -340,17 +322,21 @@
 
       // Partition column
       int partitionIndex = Integer.parseInt(m.group(1));
-      ColumnMetadata resolved = MetadataUtils.newScalar(col.name(), PARTITION_COL_TYPE);
-      SchemaUtils.markAsPartition(resolved, partitionIndex);
-      columns.add(new PartitionColumnMarker(partitionIndex));
-      scanSchema.resolveImplicit(col, resolved);
+      resolve(col,
+          MetadataUtils.newScalar(col.name(), PARTITION_COL_TYPE),
+          new PartitionColumnMarker(partitionIndex));
 
       // Remember the partition for later wildcard expansion
       referencedPartitions.add(partitionIndex);
     }
 
-    private void buildImplicitColumn(ImplicitFileColumns defn,
-        ImplicitColumnResolver parser, ColumnHandle col) {
+    private void resolve(ColumnHandle col, ColumnMetadata resolved, ImplicitColumnMarker marker) {
+      columns.add(marker);
+      scanSchema.resolveImplicit(col, resolved, marker);
+    }
+
+    private void buildImplicitColumn(ImplicitFileColumn defn,
+        ColumnHandle col) {
 
       // If the projected column is a map or array, then it shadows the
       // metadata column. Example: filename.x, filename[2].
@@ -358,31 +344,58 @@
       if (!projCol.isSimple()) {
         logger.warn("Projected column {} shadows implicit column {}",
             projCol.projectString(), col.name());
+      } else if (defn instanceof ImplicitInternalFileColumns) {
+        resolveInternalColumn(col, (ImplicitInternalFileColumns) defn);
       } else {
-        ColumnMetadata resolved = MetadataUtils.newScalar(col.name(), IMPLICIT_COL_TYPE);
-        SchemaUtils.markImplicit(resolved, defn.propertyValue());
-        columns.add(new ImplicitColumnMarker(defn));
-        scanSchema.resolveImplicit(col, resolved);
+        resolve(col,
+            MetadataUtils.newScalar(col.name(), IMPLICIT_COL_TYPE),
+            new FileImplicitMarker((ImplicitFileColumns) defn));
       }
     }
+
+    private void resolveInternalColumn(ColumnHandle col,
+        ImplicitInternalFileColumns defn) {
+
+      // Tests may not provide the DFS, real code must
+      if (defn == ImplicitInternalFileColumns.LAST_MODIFIED_TIME &&
+          parser.dfs == null) {
+        throw new IllegalStateException(
+            "Must provide a file system to use " + defn.name());
+      }
+
+      // Check if this is an implied metadata scan
+      if (defn == ImplicitInternalFileColumns.PROJECT_METADATA) {
+        isMetadataScan = true;
+      }
+
+      // TODO: Internal columns are VARCHAR for historical reasons.
+      // Better to use a type that fits the column purposes.
+      resolve(col,
+          MetadataUtils.newScalar(col.name(),
+              defn.isOptional() ? OPTIONAL_INTERNAL_COL_TYPE : IMPLICIT_COL_TYPE),
+          new InternalColumnMarker(defn));
+    }
   }
 
   public static final MajorType IMPLICIT_COL_TYPE = Types.required(MinorType.VARCHAR);
   public static final MajorType PARTITION_COL_TYPE =  Types.optional(MinorType.VARCHAR);
+  public static final MajorType OPTIONAL_INTERNAL_COL_TYPE =  Types.optional(MinorType.VARCHAR);
 
   private final int maxPartitionDepth;
   private final boolean useLegacyWildcardExpansion;
   private final String partitionDesignator;
   private final Pattern partitionPattern;
   private final Pattern partitionTypePattern;
-  private final Map<String, ImplicitFileColumns> colDefs = CaseInsensitiveMap.newHashMap();
-  private final Map<String, ImplicitFileColumns> typeDefs = CaseInsensitiveMap.newHashMap();
+  private final Map<String, ImplicitFileColumn> colDefs = CaseInsensitiveMap.newHashMap();
+  private final Map<String, ImplicitFileColumn> typeDefs = CaseInsensitiveMap.newHashMap();
   private final CustomErrorContext errorContext;
+  private final DrillFileSystem dfs;
 
   public ImplicitColumnResolver(ImplicitColumnOptions options, CustomErrorContext errorContext) {
     this.errorContext = errorContext;
     this.maxPartitionDepth = options.maxPartitionDepth;
     this.useLegacyWildcardExpansion = options.useLegacyWildcardExpansion;
+    this.dfs = options.dfs;
     this.partitionDesignator = options.optionSet.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
     this.partitionPattern = Pattern.compile(partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE);
     if (partitionDesignator.equals(ColumnMetadata.IMPLICIT_PARTITION_PREFIX)) {
@@ -391,12 +404,22 @@
       this.partitionTypePattern = Pattern.compile(ColumnMetadata.IMPLICIT_PARTITION_PREFIX + "(\\d+)",
           Pattern.CASE_INSENSITIVE);
     }
+
+    // File implicit columns: can be defined in the provided schema
     for (ImplicitFileColumns defn : ImplicitFileColumns.values()) {
       String colName = options.optionSet.getString(defn.optionName());
       if (!Strings.isNullOrEmpty(colName)) {
         this.colDefs.put(colName, defn);
       }
-      typeDefs.put(defn.propertyValue(), defn);
+      this.typeDefs.put(defn.propertyValue(), defn);
+    }
+
+    // Internal implicit cols: cannot be defined in the provided schema
+    for (ImplicitInternalFileColumns defn : ImplicitInternalFileColumns.values()) {
+      String colName = options.optionSet.getString(defn.optionName());
+      if (!Strings.isNullOrEmpty(colName)) {
+        this.colDefs.put(colName, defn);
+      }
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java
index d6e7ef6..ee8e6b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/file/ImplicitFileColumnsHandler.java
@@ -19,15 +19,15 @@
 
 import java.util.List;
 
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ImplicitColumnOptions;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ParseResult;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder.RepeatedBatchBuilder;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ColumnMarker;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ImplicitColumnOptions;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ParseResult;
 import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
+import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -41,26 +41,37 @@
  */
 public class ImplicitFileColumnsHandler {
 
+  private final DrillFileSystem dfs;
   private final ImplicitColumnResolver parser;
   private final ResultVectorCache vectorCache;
   private final Path rootDir;
   private final ParseResult parseResult;
+  private final boolean isCompressible;
 
-  public ImplicitFileColumnsHandler(OperatorContext context, FileScanLifecycleBuilder scanOptions,
+  public ImplicitFileColumnsHandler(DrillFileSystem dfs, OptionSet options,
+      FileScanLifecycleBuilder scanOptions,
       ResultVectorCache vectorCache, ScanSchemaTracker schemaTracker) {
-    ImplicitColumnOptions options = new ImplicitColumnOptions()
-        .optionSet(context.getFragmentContext().getOptions())
+    ImplicitColumnOptions implicitOptions = new ImplicitColumnOptions()
+        .optionSet(options)
+        .dfs(dfs)
         .maxPartitionDepth(scanOptions.maxPartitionDepth())
         .useLegacyWildcardExpansion(scanOptions.useLegacyWildcardExpansion());
+    this.dfs = dfs;
     this.rootDir = scanOptions.rootDir();
-    this.parser = new ImplicitColumnResolver(options, scanOptions.errorContext());
+    this.parser = new ImplicitColumnResolver(implicitOptions, scanOptions.errorContext());
     this.vectorCache = vectorCache;
     this.parseResult = parser.parse(schemaTracker);
+    this.isCompressible = scanOptions.isCompressible();
   }
 
-  public StaticBatchBuilder forFile(Path filePath) {
-    FileDescrip fileInfo = new FileDescrip(filePath, rootDir);
-    List<ColumnMarker> columns = parseResult.columns();
+  public FileDescrip makeDescrip(FileWork fileWork) {
+    FileDescrip descrip = new FileDescrip(dfs, fileWork, rootDir);
+    descrip.setCompressible(isCompressible);
+    return descrip;
+  }
+
+  public StaticBatchBuilder forFile(FileDescrip fileInfo) {
+    List<ImplicitColumnMarker> columns = parseResult.columns();
     if (columns.isEmpty()) {
       return null;
     }
@@ -70,4 +81,6 @@
     }
     return new RepeatedBatchBuilder(vectorCache, parseResult.schema(), values);
   }
+
+  public boolean isMetadataScan() { return parseResult.isMetadataScan(); }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
index e758fc3..f322f51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
@@ -34,70 +34,68 @@
 import org.apache.drill.exec.physical.resultSet.impl.ResultSetOptionBuilder;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Manages the schema and batch construction for a managed reader.
- * Allows the reader itself to be as simple as possible. This class
- * implements the basic {@link RowBatchReader} protocol based on
- * three methods, and converts it to the two-method protocol of
- * the managed reader. The {@code open()} call of the
+ * Manages the schema and batch construction for a managed reader. Allows the
+ * reader itself to be as simple as possible. This class implements the basic
+ * {@link RowBatchReader} protocol based on three methods, and converts it to
+ * the two-method protocol of the managed reader. The {@code open()} call of the
  * {@code RowBatchReader} is combined with the constructor of the
- * {@link ManagedReader}, enforcing the rule that the managed reader
- * is created just-in-time when it is to be used, which avoids
- * accidentally holding resources for the life of the scan.
+ * {@link ManagedReader}, enforcing the rule that the managed reader is created
+ * just-in-time when it is to be used, which avoids accidentally holding
+ * resources for the life of the scan. Also allows most of the reader's fields
+ * to be {@code final}.
  * <p>
- * Coordinates the components that wrap a reader to create the final
- * output batch:
+ * Coordinates the components that wrap a reader to create the final output
+ * batch:
  * <ul>
- * <li>The actual reader which load (possibly a subset of) the
- * columns requested from the input source.</li>
- * <li>Implicit columns manager instance which populates implicit
- * file columns, partition columns, and Drill's internal metadata
- * columns.</li>
- * <li>The missing columns handler which "makes up" values for projected
- * columns not read by the reader.</li>
- * <li>Batch assembler, which combines the three sources of vectors
- * to create the output batch with the schema specified by the
- * schema tracker.</li>
+ * <li>The actual reader which loads (possibly a subset of) the columns requested
+ * from the input source.</li>
+ * <li>Implicit columns manager instance which populates implicit file columns,
+ * partition columns, and Drill's internal implicit columns.</li>
+ * <li>The missing columns handler which "makes up" values for projected columns
+ * not read by the reader.</li>
+ * <li>Batch assembler, which combines the three sources of vectors to create
+ * the output batch with the schema specified by the schema tracker.</li>
  * </ul>
  * <p>
  * This class coordinates the reader-visible aspects of the scan:
  * <ul>
- * <li>The {@link SchemaNegotiator} (or subclass) which provides
- * schema-related input to the reader and which creates the reader's
- * {@link ResultSetLoader}, among other tasks. The schema negotiator
- * is specific to each kind of scan and is thus created via the
- * {@link ScanLifecycleBuilder}.</li>
- * <li>The reader, which is designed to be as simple as possible,
- * with all generic overhead tasks handled by this "shim" between
- * the scan operator and the actual reader implementation.</li>
+ * <li>The {@link SchemaNegotiator} (or subclass) which provides schema-related
+ * input to the reader and which creates the reader's {@link ResultSetLoader},
+ * among other tasks. The schema negotiator is specific to each kind of scan and
+ * is thus created via the {@link ScanLifecycleBuilder}.</li>
+ * <li>The reader, which is designed to be as simple as possible, with all
+ * generic overhead tasks handled by this "shim" between the scan operator and
+ * the actual reader implementation.</li>
  * </ul>
  * <p>
- * The reader is schema-driven. See {@link ScanSchemaTracker} for
- * an overview.
+ * The reader is schema-driven. See {@link ScanSchemaTracker} for an overview.
  * <ul>
- * <li>The reader is given a <i>reader input schema</i>, via the
- * schema negotiator, which specifies the desired output schema.
- * The schema can be fully dynamic (a wildcard), fully defined (a
- * prior reader already chose column types), or a hybrid.</li>
- * <li>The reader can load a subset of columns. Those that are
- * left out become "missing columns" to be filled in by this
- * class.</li>
- * <li>The <i>reader output schema</i> along with implicit and missing
- * columns, together define the scan's output schema.</li>
+ * <li>The reader is given a <i>reader input schema</i>, via the schema
+ * negotiator, which specifies the desired output schema. The schema can be
+ * fully dynamic (a wildcard), fully defined (a prior reader already chose
+ * column types), or a hybrid.</li>
+ * <li>The reader can load a subset of columns. Those that are left out become
+ * "missing columns" to be filled in by this class.</li>
+ * <li>The <i>reader output schema</i> along with implicit and missing columns,
+ * together define the scan's output schema.</li>
  * </ul>
  * <p>
- * The framework handles the projection task so the
- * reader does not have to worry about it. Reading an unwanted column
- * is low cost: the result set loader will have provided a "dummy" column
- * writer that simply discards the value. This is just as fast as having the
- * reader use if-statements or a table to determine which columns to save.
+ * The framework handles the projection task so the reader does not have to
+ * worry about it. Reading an unwanted column is low cost: the result set loader
+ * will have provided a "dummy" column writer that simply discards the value.
+ * This is just as fast as having the reader use if-statements or a table to
+ * determine which columns to save.
  */
 public class ReaderLifecycle implements RowBatchReader {
   private static final Logger logger = LoggerFactory.getLogger(ReaderLifecycle.class);
 
+  private enum State { START, DATA, FINAL, EOF }
+
   private final ScanLifecycle scanLifecycle;
   protected final TupleMetadata readerInputSchema;
   private ManagedReader reader;
@@ -107,12 +105,7 @@
   private StaticBatchBuilder implicitColumnsLoader;
   private StaticBatchBuilder missingColumnsHandler;
   private OutputBatchBuilder outputBuilder;
-
-  /**
-   * True once the reader reports EOF. This shim may keep going for another
-   * batch to handle any look-ahead row on the last batch.
-   */
-  private boolean eof;
+  private State state = State.START;
 
   public ReaderLifecycle(ScanLifecycle scanLifecycle) {
     this.scanLifecycle = scanLifecycle;
@@ -137,6 +130,8 @@
     return reader.getClass().getSimpleName();
   }
 
+  public ResultSetLoader tableLoader() { return tableLoader; }
+
   @Override
   public boolean open() {
     try {
@@ -168,6 +163,7 @@
   }
 
   public ResultSetLoader buildLoader() {
+    Preconditions.checkState(state == State.START);
     ResultSetOptionBuilder options = new ResultSetOptionBuilder()
         .rowCountLimit(Math.min(schemaNegotiator.batchSize, scanOptions().scanBatchRecordLimit()))
         .vectorCache(scanLifecycle.vectorCache())
@@ -176,12 +172,12 @@
         .projectionFilter(schemaTracker().projectionFilter(errorContext()))
         .readerSchema(schemaNegotiator.readerSchema);
 
-    // Resolve the scan scame if possible.
+    // Resolve the scan schema if possible.
     applyEarlySchema();
 
     // Create the table loader
     tableLoader = new ResultSetLoaderImpl(scanLifecycle.allocator(), options.build());
-    implicitColumnsLoader = schemaNegotiator.implicitColumnsLoader();
+    state = State.DATA;
     return tableLoader;
   }
 
@@ -205,12 +201,12 @@
 
   @Override
   public boolean defineSchema() {
-    if (!schemaNegotiator.isSchemaComplete()) {
-      return false;
+    boolean hasSchema = schemaNegotiator.isSchemaComplete() && schemaTracker().isResolved();
+    if (hasSchema) {
+      tableLoader.startBatch();
+      endBatch();
     }
-    tableLoader.startBatch();
-    endBatch();
-    return true;
+    return hasSchema;
   }
 
   @Override
@@ -218,7 +214,7 @@
 
     // The reader may report EOF, but the result set loader might
     // have a lookahead row.
-    if (isEof()) {
+    if (state == State.EOF) {
       return false;
     }
 
@@ -230,9 +226,11 @@
     // a new batch just to learn about EOF. Don't read if the reader
     // already reported EOF. In that case, we're just processing any last
     // lookahead row in the result set loader.
-    if (!eof) {
+    if (state == State.DATA) {
       try {
-        eof = !reader.next();
+        if (!reader.next()) {
+          state = State.FINAL;
+        }
       } catch (UserException e) {
         throw e;
       } catch (Exception e) {
@@ -252,11 +250,7 @@
     // Return EOF (false) only when the reader reports EOF
     // and the result set loader has drained its rows from either
     // this batch or lookahead rows.
-    return !isEof();
-  }
-
-  public boolean isEof() {
-    return eof && !tableLoader.hasRows();
+    return state != State.EOF;
   }
 
   /**
@@ -265,24 +259,21 @@
    * table row count. Then, merge the sources.
    */
   private void endBatch() {
+
+    // Let the schema negotiator finish up the batch. Needed for metadata
+    // scans on files.
+    // TODO: Modify the metadata system to handle non-file scans, then
+    // generalize the implicit columns parser, identify a new field to
+    // replace/augment fqn, and handle empty scans here.
+    schemaNegotiator.onEndBatch();
+
+    // Get the output batch
     VectorContainer readerOutput = tableLoader.harvest();
 
-    // Corner case: Reader produced no rows and defined no schema.
-    // We assume this is a null file and skip it as if we got an
-    // EOF from the constructor. There may be some actual case in
-    // which a file is known to have no records and no columns,
-    // which is an empty file. At present, there is no way to differentiate
-    // an intentional empty file from a null file.
-    if (readerOutput.getRecordCount() == 0 && tableLoader.schemaVersion() == 0) {
-      readerOutput.clear();
-      return;
-    }
-
-    // If not the first batch, and there are no rows, discard the batch
-    // as it adds no new information.
-    if (readerOutput.getRecordCount() == 0 && tableLoader.batchCount() > 1) {
+    if (readerOutput.getRecordCount() == 0 && !returnEmptyBatch(readerOutput)) {
       readerOutput.clear();
       outputBuilder = null;
+      state = State.EOF;
       return;
     }
 
@@ -292,6 +283,40 @@
       reviseOutputProjection(tableLoader.outputSchema());
     }
     buildOutputBatch(readerOutput);
+    scanLifecycle.tallyBatch();
+  }
+
+  /**
+   * The reader returned no data. Determine if this batch should be
+   * returned (return {@code true}), or if the empty batch should
+   * be returned to convey schema information. (return {@code false}).
+   */
+  private boolean returnEmptyBatch(VectorContainer readerOutput) {
+
+    // If the batch is not the first, then it conveys no new info.
+    if (scanLifecycle.batchCount() > 0) {
+      return false;
+    }
+
+    // Corner case: Reader produced no rows and defined no schema.
+    // There are three sub-cases. In the first, the reader is "late-schema",
+    // it discovers schema as it reads, and did not read anything. In
+    // this case, we assume this is a null file and skip it as if we got an
+    // EOF from the constructor.
+    //
+    // The second sub-case is as above, but a schema was provided. In this
+    // case, we can produce an empty result set to convey that schema.
+    //
+    // A third, possible, but very obscure case occurs in which a file
+    // is known to have no records and no columns. At present, there is
+    // no way to differentiate an intentional empty file from a null file.
+    if (tableLoader.schemaVersion() == 0) {
+      return schemaTracker().isResolved();
+    }
+
+    // Otherwise, we did define a schema on the first batch of the scan
+    // so we need to return it.
+    return true;
   }
 
   private void reviseOutputProjection(TupleMetadata readerOutputSchema) {
@@ -314,6 +339,12 @@
 
   private void buildOutputBatch(VectorContainer readerContainer) {
 
+    // Create the implicit columns loader loader after the first
+    // batch so we can report if the file is empty.
+    if (tableLoader.batchCount() == 1) {
+      implicitColumnsLoader = schemaNegotiator.implicitColumnsLoader();
+    }
+
     // Get the batch results in a container.
     int rowCount = readerContainer.getRecordCount();
     if (implicitColumnsLoader != null) {
@@ -353,7 +384,7 @@
 
   @Override
   public int schemaVersion() {
-    return tableLoader.schemaVersion();
+    return schemaTracker().schemaVersion();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
index 039810f..3261a05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
@@ -135,6 +135,7 @@
   private final ScanLifecycleBuilder options;
   private final ScanSchemaTracker schemaTracker;
   private final ReaderFactory<?> readerFactory;
+  private int batchCount;
 
   /**
    * Cache used to preserve the same vectors from one output batch to the
@@ -146,17 +147,20 @@
    */
   private final ResultVectorCacheImpl vectorCache;
 
-  public ScanLifecycle(OperatorContext context, ScanLifecycleBuilder options) {
+  public ScanLifecycle(OperatorContext context, ScanLifecycleBuilder builder) {
     this.context = context;
-    this.options = options;
+    this.options = builder;
     this.schemaTracker = new ScanSchemaConfigBuilder()
-        .projection(options.projection())
-        .definedSchema(options.definedSchema())
-        .providedSchema(options.providedSchema())
-        .allowSchemaChange(options.allowSchemaChange())
+        .projection(builder.projection())
+        .definedSchema(builder.definedSchema())
+        .providedSchema(builder.providedSchema())
+        .allowSchemaChange(builder.allowSchemaChange())
         .build();
+    if (builder.schemaValidator() != null) {
+      builder.schemaValidator().validate(schemaTracker);
+    }
     this.vectorCache = new ResultVectorCacheImpl(allocator(), false);
-    this.readerFactory = options.readerFactory();
+    this.readerFactory = builder.readerFactory();
   }
 
   public OperatorContext context() { return context; }
@@ -167,6 +171,8 @@
   public boolean hasOutputSchema() { return schemaTracker.isResolved(); }
   public CustomErrorContext errorContext() { return options.errorContext(); }
   public BufferAllocator allocator() { return context.getAllocator(); }
+  public void tallyBatch() { batchCount++; }
+  public int batchCount() { return batchCount; }
 
   public RowBatchReader nextReader() {
     if (readerFactory.hasNext()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
index 20b2a02..7b94cc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
@@ -23,6 +23,7 @@
 import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
 import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -73,6 +74,11 @@
   }
 
   @Override
+  public ProjectedColumn projectionFor(String colName) {
+    return readerLifecycle.scanLifecycle().schemaTracker().columnProjection(colName);
+  }
+
+  @Override
   public TupleMetadata providedSchema() {
     return readerLifecycle.scanOptions().providedSchema();
   }
@@ -92,6 +98,7 @@
     return baseErrorContext;
   }
 
+  @Override
   public CustomErrorContext errorContext() {
     return readerErrorContext == null ? baseErrorContext : readerErrorContext;
   }
@@ -151,4 +158,6 @@
   public ManagedReader newReader(ReaderFactory<?> readerFactory) throws EarlyEofException {
     return ((ReaderFactory<SchemaNegotiator>) readerFactory).next(this);
   }
+
+  protected void onEndBatch() { }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java
index 1944916..064f4b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/StaticBatchBuilder.java
@@ -74,7 +74,9 @@
       for (int i = 0; i < rowCount; i++) {
         writer.start();
         for (int j = 0; j < n; j++) {
-          writers[j].setValue(values[j]);
+          if (values[j] != null) {
+            writers[j].setValue(values[j]);
+          }
         }
         writer.save();
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java
index 20de4f2..7646477 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/AbstractSchemaTracker.java
@@ -41,6 +41,13 @@
     this.errorContext = errorContext;
   }
 
+  /**
+   * Validate a projection list against a defined-schema tuple. Recursively walks
+   * the tree of maps to validate all nested tuples.
+   *
+   * @param projection the parsed projection list
+   * @param schema the defined schema to validate against
+   */
   protected static void validateProjection(TupleMetadata projection, TupleMetadata schema) {
     if (projection == null || SchemaUtils.isProjectAll(projection)) {
       return;
@@ -77,8 +84,25 @@
   @Override
   public int schemaVersion() { return schema.version(); }
 
+  /**
+   * Determine if the schema is resolved. It is resolved if the
+   * schema itself is resolved. Since an empty schema is resolved, for the
+   * {@code SELECT *} case, we require at least one column, which means
+   * that something (provided schema, early reader schema) has provided
+   * us with a schema. Once resolved, a schema can never become
+   * unresolved: readers are not allowed to add dynamic columns.
+   */
   protected void checkResolved() {
-    isResolved = schema.isResolved();
+    if (isResolved) {
+      return;
+    }
+    switch (projectionType()) {
+    case ALL:
+      isResolved = !schema.isEmpty() && schema.isResolved();
+      break;
+    default:
+      isResolved = schema.isResolved();
+    }
   }
 
   @Override
@@ -102,9 +126,8 @@
   private TupleMetadata implicitColumns() {
     TupleMetadata implicitCols = new TupleSchema();
     for (ColumnHandle handle : schema.columns()) {
-      ColumnMetadata col = handle.column();
-      if (SchemaUtils.isImplicit(col)) {
-        implicitCols.addColumn(col);
+      if (handle.isImplicit()) {
+        implicitCols.addColumn(handle.column());
       }
     }
     return implicitCols;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleMetadata.java
deleted file mode 100644
index d5f46d2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleMetadata.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.scan.v3.schema;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.map.CaseInsensitiveMap;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-/**
- * A mutable form of a tuple schema. Allows insertions (at the wildcard position),
- * and replacing columns (as the schema becomes resolved). Tracks implicit columns
- * (those not filled in by the reader).
- * <p>
- * Does not implement the {@code TupleMetadata} interface because that interface
- * has far more functionality than is needed here, and assumes that column order
- * remains fixed (and hence columns can be addressed by position) which is not
- * true for this class.
- * <p>
- * This class represents the top-level tuple (the row.) Maps are also dynamic,
- * but provide a subset of resolution options:
- * map fields cannot be implicit. They can, however, be defined,
- * provided, discovered or missing. Map columns can start unresolved
- * if the map comes from projection. A map itself can be resolved,
- * but its members may be unresolved. New map members may only be added at the
- * end (there is no equivalent of a wildcard position.)
- */
-public class MutableTupleMetadata {
-
-  /**
-   * Holder for a column to allow inserting and replacing columns within
-   * the top-level project list. Changes to the column within the holder
-   * must go through the tuple itself so we can track schema versions.
-   * <p>
-   * Tracks the resolution status of each individual column as
-   * described for {@link ScanSchemaTracker}. Models a column throughout the
-   * projection lifecycle. Columns evolve from unresolved to resolved at
-   * different times. Columns are either implicit (defined by the framework)
-   * or normal (defined by the reader). Columns can be defined by the
-   * planner (via a defined schema), partially defined (via a provided
-   * schema), or discovered by the reader. Regardless of the path
-   * to definition, by the time the first batch is delivered downstream,
-   * each column has an output schema which describes the data.
-   */
-  public static class ColumnHandle {
-    private ColumnMetadata col;
-    private boolean isImplicit;
-
-    public ColumnHandle(ColumnMetadata col) {
-      this.col = col;
-      this.isImplicit = SchemaUtils.isImplicit(col);
-    }
-
-    public String name() {
-      return col.name();
-    }
-
-    private void replace(ColumnMetadata col) {
-      this.col = col;
-    }
-
-    private void resolve(ColumnMetadata col) {
-      SchemaUtils.mergeColProperties(this.col, col);
-      this.col = col;
-    }
-
-    private void resolveImplicit(ColumnMetadata col) {
-      SchemaUtils.mergeColProperties(this.col, col);
-      this.col = col;
-      markImplicit();
-    }
-
-    public void markImplicit() {
-      Preconditions.checkState(SchemaUtils.isImplicit(col));
-      isImplicit = true;
-    }
-
-    public ColumnMetadata column() { return col; }
-    public boolean isImplicit() { return isImplicit; }
-
-    @Override
-    public String toString() {
-      return col.toString();
-    }
-  }
-
-  protected final List<MutableTupleMetadata.ColumnHandle> columns = new ArrayList<>();
-  protected final Map<String, MutableTupleMetadata.ColumnHandle> nameIndex =
-      CaseInsensitiveMap.newHashMap();
-  private ProjectionType projType;
-  private int insertPoint = -1;
-  private int version;
-
-  public void setProjectionType(ScanSchemaTracker.ProjectionType type) {
-    this.projType = type;
-  }
-
-  public void setInsertPoint(int insertPoint) {
-    Preconditions.checkArgument(insertPoint == -1 ||
-        insertPoint >= 0 && insertPoint <= size());
-    this.insertPoint = insertPoint;
-  }
-
-  public ScanSchemaTracker.ProjectionType projectionType() { return projType; }
-  public int size() { return columns.size(); }
-  public int version() { return version; }
-
-  /**
-   * Provide the list of partially-resolved columns. Primarily for
-   * the implicit column parser.
-   */
-  public List<MutableTupleMetadata.ColumnHandle> columns() { return columns; }
-
-  public MutableTupleMetadata.ColumnHandle find(String colName) {
-    return nameIndex.get(colName);
-  }
-
-  public void copyFrom(TupleMetadata from) {
-    if (from.isEmpty()) {
-      return;
-    }
-    for (ColumnMetadata projCol : from) {
-      add(projCol.copy());
-    }
-    version++;
-  }
-
-  public void add(ColumnMetadata col) {
-    MutableTupleMetadata.ColumnHandle holder = new ColumnHandle(col);
-    columns.add(holder);
-    addIndex(holder);
-    version++;
-  }
-
-  public void addIndex(MutableTupleMetadata.ColumnHandle holder) {
-    if (nameIndex.put(holder.column().name(), holder) != null) {
-      throw new IllegalArgumentException("Duplicate scan projection column: " + holder.name());
-    }
-  }
-
-  public void insert(int posn, ColumnMetadata col) {
-    MutableTupleMetadata.ColumnHandle holder = new ColumnHandle(col);
-    columns.add(posn, holder);
-    addIndex(holder);
-    version++;
-  }
-
-  public void insert(ColumnMetadata col) {
-    insert(insertPoint++, col);
-  }
-
-  public boolean isResolved() {
-    for (MutableTupleMetadata.ColumnHandle handle : columns) {
-      if (!isColumnResolved(handle.column())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean isColumnResolved(ColumnMetadata col) {
-    return !col.isDynamic() && (!col.isMap() || isMapResolved(col.tupleSchema()));
-  }
-
-  private boolean isMapResolved(TupleMetadata mapSchema) {
-    for (ColumnMetadata col : mapSchema) {
-      if (col.isDynamic()) {
-        return false;
-      }
-      if (col.isMap() && !isMapResolved(col.tupleSchema())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public TupleMetadata toSchema() {
-    TupleMetadata schema = new TupleSchema();
-    for (MutableTupleMetadata.ColumnHandle col : columns) {
-      schema.addColumn(col.column());
-    }
-    return schema;
-  }
-
-  public void resolveImplicit(ColumnHandle col, ColumnMetadata resolved) {
-    col.resolveImplicit(resolved);
-    version++;
-  }
-
-  public void replace(ColumnHandle col, ColumnMetadata resolved) {
-    col.replace(resolved);
-    version++;
-  }
-
-  public void resolve(ColumnHandle col, ColumnMetadata resolved) {
-    col.resolve(resolved);
-    version++;
-  }
-}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
index ba3483f..89883b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/MutableTupleSchema.java
@@ -20,8 +20,10 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.drill.common.map.CaseInsensitiveMap;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -65,11 +67,10 @@
    */
   public static class ColumnHandle {
     private ColumnMetadata col;
-    private boolean isImplicit;
+    private ImplicitColumnMarker marker;
 
     public ColumnHandle(ColumnMetadata col) {
       this.col = col;
-      this.isImplicit = SchemaUtils.isImplicit(col);
     }
 
     public String name() {
@@ -85,19 +86,18 @@
       this.col = col;
     }
 
-    private void resolveImplicit(ColumnMetadata col) {
+    private void resolveImplicit(ColumnMetadata col, ImplicitColumnMarker marker) {
       SchemaUtils.mergeColProperties(this.col, col);
       this.col = col;
-      markImplicit();
+      markImplicit(marker);
     }
 
-    public void markImplicit() {
-      Preconditions.checkState(SchemaUtils.isImplicit(col));
-      isImplicit = true;
+    public void markImplicit(ImplicitColumnMarker marker) {
+      this.marker = marker;
     }
 
     public ColumnMetadata column() { return col; }
-    public boolean isImplicit() { return isImplicit; }
+    public boolean isImplicit() { return marker != null; }
 
     @Override
     public String toString() {
@@ -105,8 +105,8 @@
     }
   }
 
-  protected final List<MutableTupleSchema.ColumnHandle> columns = new ArrayList<>();
-  protected final Map<String, MutableTupleSchema.ColumnHandle> nameIndex =
+  protected final List<ColumnHandle> columns = new ArrayList<>();
+  protected final Map<String, ColumnHandle> nameIndex =
       CaseInsensitiveMap.newHashMap();
   private ProjectionType projType;
   private int insertPoint = -1;
@@ -114,6 +114,10 @@
 
   public void setProjectionType(ScanSchemaTracker.ProjectionType type) {
     this.projType = type;
+
+    // For project none, an empty schema is valid, so
+    // force a bump in schema version.
+    version++;
   }
 
   public void setInsertPoint(int insertPoint) {
@@ -130,9 +134,9 @@
    * Provide the list of partially-resolved columns. Primarily for
    * the implicit column parser.
    */
-  public List<MutableTupleSchema.ColumnHandle> columns() { return columns; }
+  public List<ColumnHandle> columns() { return columns; }
 
-  public MutableTupleSchema.ColumnHandle find(String colName) {
+  public ColumnHandle find(String colName) {
     return nameIndex.get(colName);
   }
 
@@ -147,31 +151,52 @@
   }
 
   public void add(ColumnMetadata col) {
-    MutableTupleSchema.ColumnHandle holder = new ColumnHandle(col);
+    ColumnHandle holder = new ColumnHandle(col);
     columns.add(holder);
     addIndex(holder);
     version++;
   }
 
-  public void addIndex(MutableTupleSchema.ColumnHandle holder) {
+  public void addIndex(ColumnHandle holder) {
     if (nameIndex.put(holder.column().name(), holder) != null) {
       throw new IllegalArgumentException("Duplicate scan projection column: " + holder.name());
     }
   }
 
-  public void insert(int posn, ColumnMetadata col) {
-    MutableTupleSchema.ColumnHandle holder = new ColumnHandle(col);
+  public ColumnHandle insert(int posn, ColumnMetadata col) {
+    ColumnHandle holder = new ColumnHandle(col);
     columns.add(posn, holder);
     addIndex(holder);
     version++;
+    return holder;
   }
 
-  public void insert(ColumnMetadata col) {
-    insert(insertPoint++, col);
+  public ColumnHandle insert(ColumnMetadata col) {
+    return insert(insertPoint++, col);
+  }
+
+  /**
+   * Move a column from its current position (which must be past the insert point)
+   * to the insert point. An index entry already exists. Special operation done
+   * when matching a provided schema to a projection list that includes a wildcard
+   * and explicitly projected columns. Works around unfortunate behavior in the
+   * planner.
+   */
+  public void moveIfExplicit(String colName) {
+    ColumnHandle holder = find(colName);
+    Objects.requireNonNull(holder);
+    int posn = columns.indexOf(holder);
+    if (posn == insertPoint) {
+      insertPoint++;
+    } else if (posn > insertPoint) {
+      columns.remove(posn);
+      columns.add(insertPoint++, holder);
+      version++;
+    }
   }
 
   public boolean isResolved() {
-    for (MutableTupleSchema.ColumnHandle handle : columns) {
+    for (ColumnHandle handle : columns) {
       if (!isColumnResolved(handle.column())) {
         return false;
       }
@@ -197,14 +222,14 @@
 
   public TupleMetadata toSchema() {
     TupleMetadata schema = new TupleSchema();
-    for (MutableTupleSchema.ColumnHandle col : columns) {
+    for (ColumnHandle col : columns) {
       schema.addColumn(col.column());
     }
     return schema;
   }
 
-  public void resolveImplicit(ColumnHandle col, ColumnMetadata resolved) {
-    col.resolveImplicit(resolved);
+  public void resolveImplicit(ColumnHandle col, ColumnMetadata resolved, ImplicitColumnMarker marker) {
+    col.resolveImplicit(resolved, marker);
     version++;
   }
 
@@ -217,4 +242,6 @@
     col.resolve(resolved);
     version++;
   }
+
+  public boolean isEmpty() { return columns.isEmpty(); }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java
index 3e1373e..a11b675 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ProjectionSchemaTracker.java
@@ -18,13 +18,13 @@
 package org.apache.drill.exec.physical.impl.scan.v3.schema;
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.DynamicSchemaFilter.RowSchemaFilter;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaResolver.SchemaType;
 import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
  * Schema tracker for the "normal" case in which schema starts from a simple
@@ -37,6 +37,7 @@
   private final TupleMetadata projection;
   private final boolean allowSchemaChange;
   private int implicitInsertPoint;
+  private int readerSchemaCount;
   private boolean allowMapAdditions = true;
 
   public ProjectionSchemaTracker(TupleMetadata definedSchema,
@@ -47,16 +48,16 @@
     this.allowSchemaChange = false;
     schema.copyFrom(definedSchema);
     validateProjection(parseResult.dynamicSchema, definedSchema);
-    checkResolved();
 
-    ScanSchemaTracker.ProjectionType projType;
+    ProjectionType projType;
     if (schema.size() == 0) {
-      projType = ScanSchemaTracker.ProjectionType.NONE;
+      projType = ProjectionType.NONE;
     } else {
-      projType = ScanSchemaTracker.ProjectionType.SOME;
+      projType = ProjectionType.SOME;
     }
     schema.setProjectionType(projType);
     this.implicitInsertPoint = -1;
+    checkResolved();
   }
 
   public ProjectionSchemaTracker(ProjectionParseResult parseResult, boolean allowSchemaChange,
@@ -67,15 +68,15 @@
     this.schema.copyFrom(projection);
 
     // Work out the projection type: wildcard, empty, or explicit.
-    ScanSchemaTracker.ProjectionType projType;
+    ProjectionType projType;
     if (parseResult.isProjectAll()) {
-      projType = ScanSchemaTracker.ProjectionType.ALL;
+      projType = ProjectionType.ALL;
     } else if (projection.isEmpty()) {
-      projType = ScanSchemaTracker.ProjectionType.NONE;
+      projType = ProjectionType.NONE;
       this.isResolved = true;
       this.allowMapAdditions = false;
     } else {
-      projType = ScanSchemaTracker.ProjectionType.SOME;
+      projType = ProjectionType.SOME;
     }
     this.schema.setProjectionType(projType);
 
@@ -84,6 +85,11 @@
     this.implicitInsertPoint = parseResult.wildcardPosn;
   }
 
+  @Override
+  public ProjectedColumn columnProjection(String colName) {
+    return (ProjectedColumn) projection.metadata(colName);
+  }
+
   public void applyProvidedSchema(TupleMetadata providedSchema) {
     boolean isStrict = SchemaUtils.isStrict(providedSchema);
     new ScanSchemaResolver(schema,
@@ -132,6 +138,8 @@
   public ProjectionFilter projectionFilter(CustomErrorContext errorContext) {
     switch (projectionType()) {
       case ALL:
+
+        // Empty schema implies we've only seen the wildcard this far.
         if (schema.size() == 0) {
           return ProjectionFilter.PROJECT_ALL;
         }
@@ -146,7 +154,17 @@
   @Override
   public void applyReaderSchema(TupleMetadata readerOutputSchema,
       CustomErrorContext errorContext) {
-    new ScanSchemaResolver(schema, SchemaType.READER_SCHEMA, allowMapAdditions, errorContext)
+    SchemaType schemaType;
+
+    // The first reader can reposition columns projected with a wildcard,
+    // other readers cannot as we want to preserve column order after the
+    // first batch.
+    if (readerSchemaCount == 0 && allowSchemaChange) {
+      schemaType = SchemaType.FIRST_READER_SCHEMA;
+    } else {
+      schemaType = SchemaType.READER_SCHEMA;
+    }
+    new ScanSchemaResolver(schema, schemaType, allowMapAdditions, errorContext)
         .applySchema(readerOutputSchema);
     if (!allowSchemaChange) {
       allowMapAdditions = false;
@@ -155,11 +173,11 @@
       }
     }
     checkResolved();
+    readerSchemaCount++;
   }
 
   @Override
-  public void expandImplicitCol(ColumnMetadata resolved) {
-    Preconditions.checkArgument(SchemaUtils.isImplicit(resolved));
-    schema.insert(implicitInsertPoint++, resolved);
+  public void expandImplicitCol(ColumnMetadata resolved, ImplicitColumnMarker marker) {
+    schema.insert(implicitInsertPoint++, resolved).markImplicit(marker);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java
index 24511a1..9af575a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaConfigBuilder.java
@@ -71,6 +71,8 @@
     if (errorContext == null) {
       errorContext = EmptyErrorContext.INSTANCE;
     }
+
+    // Parse the projection list
     ProjectionParseResult result;
     if (projectionList == null) {
       result = null;
@@ -78,21 +80,45 @@
       result = ScanProjectionParser.parse(projectionList);
     }
 
+    // If a strict schema is provided, then no schema changes are allowed.
     if (providedSchema != null && SchemaUtils.isStrict(providedSchema)) {
       allowSchemaChange = false;
     }
+
+    // Figure out the schema tracker to use
     if (definedSchema == null) {
+
+      // No defined schema: this is a projection-based tracker, possibly
+      // constrained by a provided schema.
       ProjectionSchemaTracker tracker = new ProjectionSchemaTracker(result, allowSchemaChange, errorContext);
+
+      // Apply the provided schema. Doing so forces resolution of the projection
+      // list just appled above.
       if (providedSchema != null) {
         tracker.applyProvidedSchema(providedSchema);
       }
       return tracker;
     } else {
+
+      // Defined schema case, which is supported only via tests at present;
+      // the planner can't yet produce a defined schema.
+
+      // A defined schema can include dynamic columns (those with no type.) If
+      // so, treat the dynamic schema as combination of a projection list and a
+      // provided schema.
       if (!MetadataUtils.hasDynamicColumns(definedSchema)) {
         SchemaBasedTracker tracker = new SchemaBasedTracker(definedSchema, errorContext);
-        tracker.validateProjection(result.dynamicSchema);
+
+        // A projection list is not required. But, if provided, it must be consistent
+        // with the defined schema.
+        if (result != null) {
+          tracker.validateProjection(result.dynamicSchema);
+        }
         return tracker;
       } else {
+
+        // The defined schema has not dynamic columns: it is fully defined, just like
+        // in a "classic" DB. Use a schema-driven schema tracker.
         return new ProjectionSchemaTracker(definedSchema, result, errorContext);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java
index 6d1ec40..34ceb9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaResolver.java
@@ -82,13 +82,14 @@
    * Indicates the source of the schema to be analyzed.
    * Each schema type has subtly different rules. The
    * schema type allows us to inject those differences inline
-   * within the resolution process. Also, each schema caries
+   * within the resolution process. Also, each schema carries
    * a tag used for error reporting.
    */
   public enum SchemaType {
     STRICT_PROVIDED_SCHEMA("Provided"),
     LENIENT_PROVIDED_SCHEMA("Provided"),
     EARLY_READER_SCHEMA("Reader"),
+    FIRST_READER_SCHEMA("Reader"),
     READER_SCHEMA("Reader"),
     MISSING_COLS("Missing columns");
 
@@ -105,41 +106,64 @@
 
   private final MutableTupleSchema schema;
   private final SchemaType mode;
-  private final boolean isProjectAll;
   private final boolean allowMapAdditions;
   private final String source;
   private final CustomErrorContext errorContext;
+  private final boolean allowColumnReorder;
 
   public ScanSchemaResolver(MutableTupleSchema schema, SchemaType mode,
       boolean allowMapAdditions,
       CustomErrorContext errorContext) {
     this.schema = schema;
-    this.isProjectAll = schema.projectionType() == ProjectionType.ALL;
     this.mode = mode;
     this.errorContext = errorContext;
     this.allowMapAdditions = allowMapAdditions;
     this.source = mode.source();
+    switch (mode) {
+      case STRICT_PROVIDED_SCHEMA:
+      case LENIENT_PROVIDED_SCHEMA:
+      case EARLY_READER_SCHEMA:
+      case FIRST_READER_SCHEMA:
+
+        // Allow reordering columns with projection is of the form
+        // *, foo. Move bar to its place in the schema (foo, bar) rather
+        // than at the end, as with implicit columns.
+        this.allowColumnReorder = schema.projectionType() == ProjectionType.ALL;
+        break;
+      default:
+        this.allowColumnReorder = false;
+    }
   }
 
   public void applySchema(TupleMetadata sourceSchema) {
     switch (schema.projectionType()) {
       case ALL:
+        projectSchema(sourceSchema);
+        if (mode == SchemaType.STRICT_PROVIDED_SCHEMA) {
+          schema.setProjectionType(ScanSchemaTracker.ProjectionType.SOME);
+        }
+        break;
       case SOME:
         projectSchema(sourceSchema);
         break;
       default:
         // Do nothing
     }
-    if (mode == SchemaType.STRICT_PROVIDED_SCHEMA && isProjectAll) {
-      schema.setProjectionType(ScanSchemaTracker.ProjectionType.SOME);
-    }
   }
 
   /**
-   * A project list can contain implicit columns in
-   * addition to the wildcard. The wildcard defines the
-   * <i>insert point</i>: the point at which reader-defined
-   * columns are inserted as found.
+   * A project list can contain implicit columns in addition to the wildcard.
+   * The wildcard defines the <i>insert point</i>: the point at which
+   * reader-defined columns are inserted as found. This version applies a
+   * provided schema to a projection. If we are given a query of the form
+   * {@code SELECT * FROM foo ORDER BY bar}, Drill will give us a projection
+   * list of the form {@code [`**`, `bar`]} and normal projection processing
+   * will project all provided columns, except {@code bar}, in place of the
+   * wildcard. Since this behavior differs from all other DBs, we apply special
+   * processing, we move the projection column into the next wildcard position
+   * as if Drill did not include the extra column projection. This is a hack,
+   * but one that helps with ease-of-use. We apply the same rule to the first
+   * reader schema for the same reason.
    */
   private void projectSchema(TupleMetadata sourceSchema) {
     for (ColumnMetadata colSchema : sourceSchema) {
@@ -148,6 +172,9 @@
         insertColumn(colSchema);
       } else {
         mergeColumn(existing, colSchema);
+        if (allowColumnReorder) {
+          schema.moveIfExplicit(colSchema.name());
+        }
       }
     }
   }
@@ -160,8 +187,9 @@
    */
   private void insertColumn(ColumnMetadata col) {
     switch (mode) {
+      case FIRST_READER_SCHEMA:
       case READER_SCHEMA:
-        if (!isProjectAll) {
+        if (schema.projectionType() != ProjectionType.ALL) {
           throw new IllegalStateException(
               "Reader should not have projected an unprojected column: " + col.name());
         }
@@ -169,7 +197,7 @@
       case EARLY_READER_SCHEMA:
       case LENIENT_PROVIDED_SCHEMA:
       case STRICT_PROVIDED_SCHEMA:
-        if (!isProjectAll || SchemaUtils.isExcludedFromWildcard(col)) {
+        if (schema.projectionType() != ProjectionType.ALL || SchemaUtils.isExcludedFromWildcard(col)) {
           return;
         }
         break;
@@ -200,16 +228,9 @@
     switch (mode) {
       case LENIENT_PROVIDED_SCHEMA:
       case STRICT_PROVIDED_SCHEMA:
-        // With a wilcard, there should be no existing column unless
-        // the planner projected an implicit column and the provided
-        // schema defines that same implicit column.
-        if (isProjectAll && !SchemaUtils.isImplicit(colSchema)) {
-          throw UserException.validationError()
-            .message("Provided schema column name conflicts with presumed implicit column name")
-            .addContext("Column", colSchema.name())
-            .addContext(errorContext)
-            .build(logger);
-        }
+        // Even with a wildcard, the planner may add additional columns.
+        // Example SELECT * FROM foo ORDER BY bar
+        // The planner will provide us with [`*`, `bar`]
         break;
       case EARLY_READER_SCHEMA:
         // If the reader offers a column which duplicates an implicit column,
@@ -341,6 +362,7 @@
       case LENIENT_PROVIDED_SCHEMA:
       case STRICT_PROVIDED_SCHEMA:
         break;
+      case FIRST_READER_SCHEMA:
       case READER_SCHEMA:
         if (!allowMapAdditions) {
           throw new IllegalStateException("Reader should not have projected column: " + readerCol.name());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java
index 7d1aa30..af5efa4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/ScanSchemaTracker.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.scan.v3.schema;
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -305,6 +306,11 @@
   ProjectionType projectionType();
 
   /**
+   * Return the projection for a column, if any.
+   */
+  ProjectedColumn columnProjection(String colName);
+
+  /**
    * Is the scan schema resolved? The schema is resolved depending on the
    * complex lifecycle explained in the class comment. Resolution occurs
    * when the wildcard (if any) is expanded, and all explicit projection
@@ -349,7 +355,7 @@
    * then determines which partition columns are needed and calls this
    * method to add each one.
    */
-  void expandImplicitCol(ColumnMetadata resolved);
+  void expandImplicitCol(ColumnMetadata resolved, ImplicitColumnMarker marker);
 
   /**
    * Indicate that implicit column parsing is complete. Returns the implicit
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java
index 24beb38..8e5d6ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/SchemaBasedTracker.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.scan.v3.schema;
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnMarker;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.DynamicSchemaFilter.RowSchemaFilter;
 import org.apache.drill.exec.physical.resultSet.impl.ProjectionFilter;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -39,10 +40,6 @@
     super(errorContext);
     this.definedSchema = definedSchema;
     schema.copyFrom(definedSchema);
-    checkResolved();
-
-    // If not resolved, should not have used this tracker.
-    Preconditions.checkState(isResolved);
 
     ScanSchemaTracker.ProjectionType projType;
     if (schema.size() == 0) {
@@ -51,13 +48,24 @@
       projType = ScanSchemaTracker.ProjectionType.SOME;
     }
     schema.setProjectionType(projType);
+    checkResolved();
+
+    // If not resolved, should not have used this tracker.
+    Preconditions.checkState(isResolved);
   }
 
+  /**
+   * Validate a projection list (provided as an argument) against a
+   * defined schema already held by this tracker. Ensures that, when we
+   * have both a defined schema and projection list, that they are
+   * consistent.
+   *
+   * @param projection the parsed projection list
+   */
   public void validateProjection(TupleMetadata projection) {
-    if (projection == null) {
-      return;
+    if (projection != null) {
+      validateProjection(projection, definedSchema);
     }
-    validateProjection(projection, definedSchema);
   }
 
   @Override
@@ -85,10 +93,13 @@
   }
 
   @Override
-  public void expandImplicitCol(ColumnMetadata resolved) {
+  public void expandImplicitCol(ColumnMetadata resolved, ImplicitColumnMarker marker) {
     throw new IllegalStateException("Can't expand a defined schema.");
   }
 
   @Override
   public int schemaVersion() { return 1; }
+
+  @Override
+  public ProjectedColumn columnProjection(String colName) { return null; }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
index 08df4b3..bf9ba76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
@@ -78,7 +78,6 @@
    *          variable-length argument list of column values
    * @return this writer
    */
-
   RowSetLoader addRow(Object... values);
 
   /**
@@ -89,7 +88,6 @@
    * @param value value of the one and only column
    * @return this writer
    */
-
   RowSetLoader addSingleCol(Object value);
 
   /**
@@ -107,7 +105,6 @@
    *
    * @return true if another row can be written, false if not
    */
-
   boolean isFull();
 
   /**
@@ -116,7 +113,6 @@
    *
    * @return number of rows to be sent downstream
    */
-
   int rowCount();
 
   /**
@@ -128,7 +124,6 @@
    *
    * @return the current write index
    */
-
   int rowIndex();
 
   /**
@@ -151,7 +146,6 @@
    *
    * @return true if another row can be added, false if the batch is full
    */
-
   boolean start();
 
   /**
@@ -160,6 +154,5 @@
    * to recover from partially-written rows that turn out to contain errors.
    * Done automatically if using <tt>setRow()</tt>.
    */
-
   void save();
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index 0467abc..8fe4ebd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -317,7 +317,7 @@
    * @param length   row group length
    * @return implicit column value for specified implicit file column
    */
-  private static String getImplicitColumnValue(ImplicitFileColumn column, Path filePath,
+  public static String getImplicitColumnValue(ImplicitFileColumn column, Path filePath,
       FileSystem fs, Integer index, Long start, Long length) {
     if (column instanceof ImplicitFileColumns) {
       ImplicitFileColumns fileColumn = (ImplicitFileColumns) column;
@@ -359,9 +359,10 @@
   }
 
   /**
-   * Returns list of implicit file columns which includes all elements from {@link ImplicitFileColumns},
-   * {@link ImplicitInternalFileColumns#LAST_MODIFIED_TIME} and {@link ImplicitInternalFileColumns#USE_METADATA}
-   * columns.
+   * Returns list of implicit file columns which includes all elements from
+   * {@link ImplicitFileColumns},
+   * {@link ImplicitInternalFileColumns#LAST_MODIFIED_TIME} and
+   * {@link ImplicitInternalFileColumns#USE_METADATA} columns.
    *
    * @return list of implicit file columns
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
index c22d453..580ba7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -23,9 +23,9 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
 import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -72,7 +72,7 @@
     TupleMetadata readerSchema = AvroSchemaUtil.convert(reader.getSchema());
     logger.debug("Avro file converted schema: {}", readerSchema);
     TupleMetadata providedSchema = negotiator.providedSchema();
-    TupleMetadata tableSchema = StandardConversions.mergeSchemas(providedSchema, readerSchema);
+    TupleMetadata tableSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
     logger.debug("Avro file table schema: {}", tableSchema);
     negotiator.tableSchema(tableSchema, true);
     loader = negotiator.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java
index 3c42227..527f21d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/ColumnConverterFactory.java
@@ -21,6 +21,7 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -57,7 +58,7 @@
     if (providedSchema == null) {
       standardConversions = null;
     } else {
-      standardConversions = new StandardConversions(providedSchema.properties());
+      standardConversions = StandardConversions.builder().withSchema(providedSchema).build();
     }
   }
 
@@ -157,7 +158,7 @@
     if (standardConversions == null) {
       valueWriter = scalarWriter;
     } else {
-      valueWriter = standardConversions.converter(scalarWriter, readerSchema);
+      valueWriter = standardConversions.converterFor(scalarWriter, readerSchema);
     }
     return buildScalar(readerSchema, valueWriter);
   }
@@ -246,7 +247,7 @@
       TupleWriter tupleWriter, List<ColumnConverter> converters) {
     // fill in tuple schema for cases when it contains recursive named record types
     TupleMetadata readerSchema = AvroSchemaUtil.convert(genericRecord.getSchema());
-    TupleMetadata tableSchema = StandardConversions.mergeSchemas(providedSchema, readerSchema);
+    TupleMetadata tableSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
     tableSchema.toMetadataList().forEach(tupleWriter::addColumn);
 
     IntStream.range(0, tableSchema.size())
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 8e3e69e..79840c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -19,8 +19,6 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -160,7 +158,7 @@
     for (int i = 0; i < fieldNames.length; i++) {
       ScalarWriter colWriter = writer.scalar(fieldNames[i]);
       if (writer.isProjected()) {
-        colWriters[i] = conversions.converter(colWriter, MinorType.VARCHAR);
+        colWriters[i] = conversions.converterFor(colWriter, MinorType.VARCHAR);
       } else {
         colWriters[i] = colWriter;
       }
@@ -220,7 +218,7 @@
     StandardConversions conversions = conversions(providedSchema);
     ValueWriter[] colWriters = new ValueWriter[providedSchema.size()];
     for (int i = 0; i < colWriters.length; i++) {
-      colWriters[i] = conversions.converter(
+      colWriters[i] = conversions.converterFor(
           writer.scalar(providedSchema.metadata(i).name()), MinorType.VARCHAR);
     }
     return new ConstrainedFieldOutput(writer, colWriters);
@@ -248,14 +246,10 @@
 
     // CSV maps blank columns to nulls (for nullable non-string columns),
     // or to the default value (for non-nullable non-string columns.)
-    Map<String, String> props = providedSchema.properties();
-    if (props == null) {
-      return new StandardConversions(ColumnMetadata.BLANK_AS_NULL);
-    } else {
-      props = new HashMap<>(props);
-      props.put(ColumnMetadata.BLANK_AS_PROP, ColumnMetadata.BLANK_AS_NULL);
-      return new StandardConversions(props);
-    }
+    return StandardConversions.builder()
+      .withSchema(providedSchema)
+      .blankAs(ColumnMetadata.BLANK_AS_NULL)
+      .build();
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
index f8c512e..99efd76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
@@ -90,7 +90,7 @@
       writers = new ValueWriter[readerSchema.size()];
       for (int i = 0; i < writers.length; i++) {
         ColumnMetadata colSchema = readerSchema.metadata(i);
-        writers[i] = conversions.converter(rowWriter.scalar(i), colSchema);
+        writers[i] = conversions.converterFor(rowWriter.scalar(i), colSchema);
       }
     }
 
@@ -178,9 +178,9 @@
       if (saveMatchedRows) {
         // Save using the defined columns
         TupleMetadata providedSchema = config.providedSchema;
-        StandardConversions conversions = new StandardConversions(
-            providedSchema == null || !providedSchema.hasProperties() ?
-                null : providedSchema.properties());
+        StandardConversions conversions = StandardConversions.builder()
+            .withSchema(providedSchema)
+            .build();
         vectorWriter = new ScalarGroupWriter(writer, config.readerSchema, conversions);
       }
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
index 4f16e76..bfdff94 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
@@ -45,14 +45,23 @@
 
 public class ScanTestUtils {
 
-  // Default file metadata column names; primarily for testing.
+  // Default implicit file column names; primarily for testing.
 
   public static final String FILE_NAME_COL = "filename";
   public static final String FULLY_QUALIFIED_NAME_COL = "fqn";
   public static final String FILE_PATH_COL = "filepath";
   public static final String SUFFIX_COL = "suffix";
   public static final String PARTITION_COL = "dir";
+
+  // Default Internal implicit columns; primarily for testing.
+
   public static final String LAST_MODIFIED_TIME_COL = "lmt";
+  public static final String ROW_GROUP_INDEX_COL = "rgi";
+  public static final String ROW_GROUP_START_COL = "rgs";
+  public static final String ROW_GROUP_LENGTH_COL = "rgl";
+
+  // Yes, the following both have the same value.
+  public static final String USE_METADATA_COL ="$project_metadata$";
   public static final String PROJECT_METADATA_COL = "$project_metadata$";
 
   public static abstract class ScanFixtureBuilder {
@@ -71,8 +80,12 @@
       builder().projection(RowSetTestUtils.projectAll());
     }
 
-    public void projectAllWithMetadata(int dirs) {
-      builder().projection(ScanTestUtils.projectAllWithMetadata(dirs));
+    public void projectAllWithFileImplicit(int dirs) {
+      builder().projection(ScanTestUtils.projectAllWithFileImplicit(dirs));
+    }
+
+    public void projectAllWithAllImplicit(int dirs) {
+      builder().projection(ScanTestUtils.projectAllWithAllImplicit(dirs));
     }
 
     public void setProjection(String... projCols) {
@@ -101,7 +114,7 @@
 
   public static class ScanFixture {
 
-    private OperatorContext opContext;
+    private final OperatorContext opContext;
     public ScanOperatorExec scanOp;
 
     public ScanFixture(OperatorContext opContext, ScanOperatorExec scanOp) {
@@ -152,7 +165,7 @@
    * @return schema with the metadata columns appended to the table columns
    */
 
-  public static TupleMetadata expandMetadata(TupleMetadata base, ImplicitColumnManager metadataProj, int dirCount) {
+  public static TupleMetadata expandImplicit(TupleMetadata base, ImplicitColumnManager metadataProj, int dirCount) {
     TupleMetadata metadataSchema = new TupleSchema();
     for (ColumnMetadata col : base) {
       metadataSchema.addColumn(col);
@@ -188,24 +201,32 @@
     return schema;
   }
 
-  public static List<SchemaPath> expandMetadata(int dirCount) {
+  public static List<String> expandImplicit(boolean includeInternal, int dirCount) {
     List<String> selected = Lists.newArrayList(
         FULLY_QUALIFIED_NAME_COL,
         FILE_PATH_COL,
         FILE_NAME_COL,
-        SUFFIX_COL,
-        LAST_MODIFIED_TIME_COL,
-        PROJECT_METADATA_COL);
+        SUFFIX_COL);
 
+    if (includeInternal) {
+      selected.add(LAST_MODIFIED_TIME_COL);
+      selected.add(PROJECT_METADATA_COL);
+    }
     for (int i = 0; i < dirCount; i++) {
       selected.add(PARTITION_COL + i);
     }
-    return RowSetTestUtils.projectList(selected);
+    return selected;
   }
 
-  public static List<SchemaPath> projectAllWithMetadata(int dirCount) {
+  public static List<SchemaPath> projectAllWithFileImplicit(int dirCount) {
     return RowSetTestUtils.concat(
         RowSetTestUtils.projectAll(),
-        expandMetadata(dirCount));
+        RowSetTestUtils.projectList(expandImplicit(false, dirCount)));
+  }
+
+  public static List<SchemaPath> projectAllWithAllImplicit(int dirCount) {
+    return RowSetTestUtils.concat(
+        RowSetTestUtils.projectAll(),
+        RowSetTestUtils.projectList(expandImplicit(true, dirCount)));
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index d4d806a..2efbe2a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -308,7 +308,7 @@
 
     // Create the scan operator
     FileScanFixtureBuilder builder = new FileScanFixtureBuilder();
-    builder.projectAllWithMetadata(2);
+    builder.projectAllWithAllImplicit(2);
     builder.addReader(reader);
     ScanFixture scanFixture = builder.build();
     ScanOperatorExec scan = scanFixture.scanOp;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
index 65646bc..87ea958 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOuputSchema.java
@@ -82,6 +82,7 @@
     private void buildWriters(TupleMetadata providedSchema,
         TupleMetadata schema) {
       RowSetLoader rowWriter = tableLoader.writer();
+      StandardConversions conversions = StandardConversions.builder().build();
       for (int i = 0; i < schema.size(); i++) {
         ColumnMetadata colSchema = schema.metadata(i);
         String colName = colSchema.name();
@@ -96,7 +97,7 @@
           writers.add(colSchema.name(), rowWriter.scalar(colIndex));
         } else {
           writers.add(colSchema.name(),
-              StandardConversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
+              conversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
         }
       }
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java
index f57cc5c..36fa31a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOrchestratorImplicitColumns.java
@@ -131,7 +131,7 @@
 
     // Verify
 
-    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+    TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
 
     String fqn = ImplicitFileColumns.FQN.getValue(filePath);
     String filePathValue = ImplicitFileColumns.FILEPATH.getValue(filePath);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
index 808447d..aa106c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/convert/TestDirectConverter.java
@@ -32,6 +32,7 @@
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions.ConversionDefn;
 import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions.ConversionType;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -70,13 +71,13 @@
     }
 
     public ConversionTestFixture withProperties(Map<String,String> props) {
-      conversions = new StandardConversions(props);
+      conversions = StandardConversions.builder().withProperties(props).build();
       return this;
     }
 
     private StandardConversions conversions() {
       if (conversions == null) {
-        conversions = new StandardConversions();
+        conversions = StandardConversions.builder().build();
       }
       return conversions;
     }
@@ -92,7 +93,7 @@
 
       // Test uses simple row writer; no support for adding columns.
       assertNotNull(colWriter);
-      ValueWriter converter = conversions().converter(colWriter, source);
+      ValueWriter converter = conversions().converterFor(colWriter, source);
       assertNotNull(converter);
       rowFormat.add(source.name(), converter);
     }
@@ -136,7 +137,7 @@
       .add("d", MinorType.VARCHAR)
       .build();
 
-    TupleMetadata mergedSchema = StandardConversions.mergeSchemas(providedSchema, readerSchema);
+    TupleMetadata mergedSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
     assertTrue(expected.isEquivalent(mergedSchema));
     assertTrue(mergedSchema.booleanProperty("foo"));
   }
@@ -680,6 +681,7 @@
    */
   @Test
   public void testBasicConversionType() {
+    StandardConversions conversions = StandardConversions.builder().build();
     TupleMetadata schema = new SchemaBuilder()
         .add("ti", MinorType.TINYINT)
         .add("si", MinorType.SMALLINT)
@@ -700,84 +702,84 @@
     ColumnMetadata stringCol = schema.metadata("str");
 
     // TinyInt --> x
-    expect(ConversionType.NONE, StandardConversions.analyze(tinyIntCol, tinyIntCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, smallIntCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, intCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, bigIntCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, float4Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, float8Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tinyIntCol, decimalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(tinyIntCol, stringCol));
+    expect(ConversionType.NONE, conversions.analyze(tinyIntCol, tinyIntCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, smallIntCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, intCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float4Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, float8Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(tinyIntCol, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(tinyIntCol, stringCol));
 
     // SmallInt --> x
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(smallIntCol, tinyIntCol));
-    expect(ConversionType.NONE, StandardConversions.analyze(smallIntCol, smallIntCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, intCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, bigIntCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, float4Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, float8Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(smallIntCol, decimalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(smallIntCol, stringCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(smallIntCol, tinyIntCol));
+    expect(ConversionType.NONE, conversions.analyze(smallIntCol, smallIntCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, intCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, float4Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, float8Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(smallIntCol, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(smallIntCol, stringCol));
 
     // Int --> x
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(intCol, tinyIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(intCol, smallIntCol));
-    expect(ConversionType.NONE, StandardConversions.analyze(intCol, intCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, bigIntCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, float4Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, float8Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, decimalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(intCol, stringCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(intCol, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(intCol, smallIntCol));
+    expect(ConversionType.NONE, conversions.analyze(intCol, intCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(intCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(intCol, float4Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(intCol, float8Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(intCol, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(intCol, stringCol));
 
     // BigInt --> x
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, tinyIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, smallIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(bigIntCol, intCol));
-    expect(ConversionType.NONE, StandardConversions.analyze(bigIntCol, bigIntCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, float4Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, float8Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, decimalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(bigIntCol, stringCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(bigIntCol, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(bigIntCol, smallIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(bigIntCol, intCol));
+    expect(ConversionType.NONE, conversions.analyze(bigIntCol, bigIntCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, float4Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, float8Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(bigIntCol, stringCol));
 
     // Float4 --> x
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, tinyIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, smallIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, intCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float4Col, bigIntCol));
-    expect(ConversionType.NONE, StandardConversions.analyze(float4Col, float4Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(float4Col, float8Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(float4Col, decimalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(float4Col, stringCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, smallIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, intCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float4Col, bigIntCol));
+    expect(ConversionType.NONE, conversions.analyze(float4Col, float4Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(float4Col, float8Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(float4Col, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(float4Col, stringCol));
 
     // Float8 --> x
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, tinyIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, smallIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, intCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, bigIntCol));
-    expect(ConversionType.IMPLICIT_UNSAFE, StandardConversions.analyze(float8Col, float4Col));
-    expect(ConversionType.NONE, StandardConversions.analyze(float8Col, float8Col));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(float8Col, decimalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(float8Col, stringCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, tinyIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, smallIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, intCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, bigIntCol));
+    expect(ConversionType.IMPLICIT_UNSAFE, conversions.analyze(float8Col, float4Col));
+    expect(ConversionType.NONE, conversions.analyze(float8Col, float8Col));
+    expect(ConversionType.IMPLICIT, conversions.analyze(float8Col, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(float8Col, stringCol));
 
     // Decimal --> x
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, tinyIntCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, smallIntCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, intCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, bigIntCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, float4Col));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, float8Col));
-    expect(ConversionType.NONE, StandardConversions.analyze(decimalCol, decimalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(decimalCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, tinyIntCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, smallIntCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, intCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, bigIntCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, float4Col));
+    expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, float8Col));
+    expect(ConversionType.NONE, conversions.analyze(decimalCol, decimalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(decimalCol, stringCol));
 
     // VarChar --> x
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, tinyIntCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, smallIntCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, intCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, bigIntCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, float4Col));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, float8Col));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, decimalCol));
-    expect(ConversionType.NONE, StandardConversions.analyze(stringCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, tinyIntCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, smallIntCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, intCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, bigIntCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, float4Col));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, float8Col));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, decimalCol));
+    expect(ConversionType.NONE, conversions.analyze(stringCol, stringCol));
   }
 
   /**
@@ -785,6 +787,7 @@
    */
   @Test
   public void testSpecialConversionType() {
+    StandardConversions conversions = StandardConversions.builder().build();
     TupleMetadata schema = new SchemaBuilder()
         .add("time", MinorType.TIME)
         .add("date", MinorType.DATE)
@@ -807,40 +810,40 @@
     ColumnMetadata stringCol = schema.metadata("str");
 
     // TIME
-    expect(ConversionType.NONE, StandardConversions.analyze(timeCol, timeCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(timeCol, stringCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, timeCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(intCol, timeCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(timeCol, intCol));
+    expect(ConversionType.NONE, conversions.analyze(timeCol, timeCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(timeCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, timeCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(intCol, timeCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(timeCol, intCol));
 
     // DATE
-    expect(ConversionType.NONE, StandardConversions.analyze(dateCol, dateCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(dateCol, stringCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, dateCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, dateCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(dateCol, bigIntCol));
+    expect(ConversionType.NONE, conversions.analyze(dateCol, dateCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(dateCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, dateCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, dateCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(dateCol, bigIntCol));
 
     // TIMESTAMP
-    expect(ConversionType.NONE, StandardConversions.analyze(tsCol, tsCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(tsCol, stringCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, tsCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(bigIntCol, tsCol));
-    expect(ConversionType.IMPLICIT, StandardConversions.analyze(tsCol, bigIntCol));
+    expect(ConversionType.NONE, conversions.analyze(tsCol, tsCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(tsCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, tsCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(bigIntCol, tsCol));
+    expect(ConversionType.IMPLICIT, conversions.analyze(tsCol, bigIntCol));
 
     // INTERVAL
-    expect(ConversionType.NONE, StandardConversions.analyze(intervalCol, intervalCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(intervalCol, stringCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, intervalCol));
+    expect(ConversionType.NONE, conversions.analyze(intervalCol, intervalCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(intervalCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, intervalCol));
 
     // INTERVALYEAR
-    expect(ConversionType.NONE, StandardConversions.analyze(yearCol, yearCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(yearCol, stringCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, yearCol));
+    expect(ConversionType.NONE, conversions.analyze(yearCol, yearCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(yearCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, yearCol));
 
     // INTERVALDAY
-    expect(ConversionType.NONE, StandardConversions.analyze(dayCol, dayCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(dayCol, stringCol));
-    expect(ConversionType.EXPLICIT, StandardConversions.analyze(stringCol, dayCol));
+    expect(ConversionType.NONE, conversions.analyze(dayCol, dayCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(dayCol, stringCol));
+    expect(ConversionType.EXPLICIT, conversions.analyze(stringCol, dayCol));
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
index f4cee33..c5d92fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -24,7 +24,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -47,7 +47,7 @@
  * Test the level of projection done at the level of the scan as a whole;
  * before knowledge of table "implicit" columns or the specific table schema.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanLevelProjection extends SubOperatorTest {
 
   private boolean isProjected(ProjectionFilter filter, ColumnMetadata col) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
index ea55bc9..842f18e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -87,7 +87,6 @@
  * because such an algorithm would require time-travel: looking into
  * the future to know what data will be scanned.)
  */
-
 @Category(RowSetTests.class)
 public class TestSchemaSmoothing extends SubOperatorTest {
 
@@ -103,12 +102,10 @@
    * discrete is just to run the basic lifecycle in a way that
    * is compatible with the schema-persistence version.
    */
-
   @Test
   public void testDiscrete() {
 
     // Set up the file metadata manager
-
     Path filePathA = new Path("hdfs:///w/x/y/a.csv");
     Path filePathB = new Path("hdfs:///w/x/y/b.csv");
     ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -116,18 +113,15 @@
         standardOptions(Lists.newArrayList(filePathA, filePathB)));
 
     // Set up the scan level projection
-
     ScanLevelProjection scanProj = ScanLevelProjection.build(
         RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
     {
       // Define a file a.csv
-
       metadataManager.startFile(filePathA);
 
       // Build the output schema from the (a, b) table schema
-
       TupleMetadata twoColSchema = new SchemaBuilder()
           .add("a", MinorType.INT)
           .addNullable("b", MinorType.VARCHAR, 10)
@@ -139,7 +133,6 @@
           ScanTestUtils.resolvers(metadataManager));
 
       // Verify the full output schema
-
       TupleMetadata expectedSchema = new SchemaBuilder()
           .add("filename", MinorType.VARCHAR)
           .add("a", MinorType.INT)
@@ -147,21 +140,18 @@
           .buildSchema();
 
       // Verify
-
       List<ResolvedColumn> columns = rootTuple.columns();
       assertEquals(3, columns.size());
       assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
       assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
       assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value());
       assertTrue(columns.get(1) instanceof ResolvedTableColumn);
-   }
+    }
     {
       // Define a file b.csv
-
       metadataManager.startFile(filePathB);
 
       // Build the output schema from the (a) table schema
-
       TupleMetadata oneColSchema = new SchemaBuilder()
           .add("a", MinorType.INT)
           .buildSchema();
@@ -177,7 +167,6 @@
       // vector level as part of vector persistence.) During projection, it is
       // marked with type NULL so that the null column builder will fill in
       // the proper type.
-
       TupleMetadata expectedSchema = new SchemaBuilder()
           .add("filename", MinorType.VARCHAR)
           .add("a", MinorType.INT)
@@ -185,7 +174,6 @@
           .buildSchema();
 
       // Verify
-
       List<ResolvedColumn> columns = rootTuple.columns();
       assertEquals(3, columns.size());
       assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
@@ -200,7 +188,6 @@
    * Low-level test of the smoothing projection, including the exceptions
    * it throws when things are not going its way.
    */
-
   @Test
   public void testSmoothingProjection() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -208,7 +195,6 @@
         ScanTestUtils.parsers());
 
     // Table 1: (a: nullable bigint, b)
-
     final TupleMetadata schema1 = new SchemaBuilder()
         .addNullable("a", MinorType.BIGINT)
         .addNullable("b", MinorType.VARCHAR)
@@ -225,7 +211,6 @@
     }
 
     // Table 2: (a: nullable bigint, c), column omitted, original schema preserved
-
     final TupleMetadata schema2 = new SchemaBuilder()
         .addNullable("a", MinorType.BIGINT)
         .add("c", MinorType.FLOAT8)
@@ -243,7 +228,6 @@
     }
 
     // Table 3: (a, c, d), column added, must replan schema
-
     final TupleMetadata schema3 = new SchemaBuilder()
         .addNullable("a", MinorType.BIGINT)
         .addNullable("b", MinorType.VARCHAR)
@@ -262,7 +246,6 @@
     }
 
     // Table 4: (a: double), change type must replan schema
-
     final TupleMetadata schema4 = new SchemaBuilder()
         .addNullable("a", MinorType.FLOAT8)
         .addNullable("b", MinorType.VARCHAR)
@@ -280,7 +263,6 @@
     }
 
     // Table 5: Drop a non-nullable column, must replan
-
     final TupleMetadata schema6 = new SchemaBuilder()
         .addNullable("a", MinorType.BIGINT)
         .addNullable("b", MinorType.VARCHAR)
@@ -302,7 +284,6 @@
    * schema. Discard prior schema. Turn off auto expansion of
    * metadata for a simpler test.
    */
-
   @Test
   public void testSmaller() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -340,7 +321,6 @@
    * Case in which the table schema and prior are disjoint
    * sets. Discard the prior schema.
    */
-
   @Test
   public void testDisjoint() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -377,7 +357,6 @@
   /**
    * Column names match, but types differ. Discard the prior schema.
    */
-
   @Test
   public void testDifferentTypes() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -411,7 +390,6 @@
    * schema (though, the output is no different than if we discarded
    * the prior schema...)
    */
-
   @Test
   public void testSameSchemas() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -446,7 +424,6 @@
    * The prior and table schemas are identical, but the cases of names differ.
    * Preserve the case of the first schema.
    */
-
   @Test
   public void testDifferentCase() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -481,7 +458,6 @@
    * Can't preserve the prior schema if it had required columns
    * where the new schema has no columns.
    */
-
   @Test
   public void testRequired() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -513,7 +489,6 @@
    * Preserve the prior schema if table is a subset and missing columns
    * are nullable or repeated.
    */
-
   @Test
   public void testMissingNullableColumns() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -546,7 +521,6 @@
    * Preserve the prior schema if table is a subset. Map the table
    * columns to the output using the prior schema ordering.
    */
-
   @Test
   public void testReordering() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
@@ -580,12 +554,10 @@
    * If using the legacy wildcard expansion, reuse schema if partition paths
    * are the same length.
    */
-
   @Test
   public void testSamePartitionLength() {
 
     // Set up the file metadata manager
-
     Path filePathA = new Path("hdfs:///w/x/y/a.csv");
     Path filePathB = new Path("hdfs:///w/x/y/b.csv");
     ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -593,9 +565,8 @@
         standardOptions(Lists.newArrayList(filePathA, filePathB)));
 
     // Set up the scan level projection
-
     ScanLevelProjection scanProj = ScanLevelProjection.build(
-        ScanTestUtils.projectAllWithMetadata(2),
+        ScanTestUtils.projectAllWithAllImplicit(2),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
     // Define the schema smoother
@@ -608,7 +579,7 @@
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
-    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+    TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
     {
       metadataManager.startFile(filePathA);
       ResolvedRow rootTuple = doResolve(smoother, tableSchema);
@@ -627,12 +598,10 @@
    * is shorter than the previous. (Unneeded partitions will be set to null by the
    * scan projector.)
    */
-
   @Test
   public void testShorterPartitionLength() {
 
     // Set up the file metadata manager
-
     Path filePathA = new Path("hdfs:///w/x/y/a.csv");
     Path filePathB = new Path("hdfs:///w/x/b.csv");
     ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -640,13 +609,11 @@
         standardOptions(Lists.newArrayList(filePathA, filePathB)));
 
     // Set up the scan level projection
-
     ScanLevelProjection scanProj = ScanLevelProjection.build(
-        ScanTestUtils.projectAllWithMetadata(2),
+        ScanTestUtils.projectAllWithAllImplicit(2),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
     // Define the schema smoother
-
     SchemaSmoother smoother = new SchemaSmoother(scanProj,
         ScanTestUtils.resolvers(metadataManager));
 
@@ -655,7 +622,7 @@
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
-    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+    TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
     {
       metadataManager.startFile(filePathA);
       ResolvedRow rootTuple = doResolve(smoother, tableSchema);
@@ -674,12 +641,10 @@
    * schema even if the new partition path is longer than the previous.
    * Because all file names are provided up front.
    */
-
   @Test
   public void testLongerPartitionLength() {
 
     // Set up the file metadata manager
-
     Path filePathA = new Path("hdfs:///w/x/a.csv");
     Path filePathB = new Path("hdfs:///w/x/y/b.csv");
     ImplicitColumnManager metadataManager = new ImplicitColumnManager(
@@ -687,13 +652,11 @@
         standardOptions(Lists.newArrayList(filePathA, filePathB)));
 
     // Set up the scan level projection
-
     ScanLevelProjection scanProj = ScanLevelProjection.build(
-        ScanTestUtils.projectAllWithMetadata(2),
+        ScanTestUtils.projectAllWithAllImplicit(2),
         ScanTestUtils.parsers(metadataManager.projectionParser()));
 
     // Define the schema smoother
-
     SchemaSmoother smoother = new SchemaSmoother(scanProj,
         ScanTestUtils.resolvers(metadataManager));
 
@@ -702,7 +665,7 @@
         .add("b", MinorType.VARCHAR)
         .buildSchema();
 
-    TupleMetadata expectedSchema = ScanTestUtils.expandMetadata(tableSchema, metadataManager, 2);
+    TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
     {
       metadataManager.startFile(filePathA);
       ResolvedRow rootTuple = doResolve(smoother, tableSchema);
@@ -719,7 +682,6 @@
   /**
    * Integrated test across multiple schemas at the batch level.
    */
-
   @Test
   public void testSmoothableSchemaBatches() {
     final ScanLevelProjection scanProj = ScanLevelProjection.build(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestFixedReceiver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestFixedReceiver.java
new file mode 100644
index 0000000..ef4dcd3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestFixedReceiver.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver.Builder;
+import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.Test;
+
+public class TestFixedReceiver extends BaseScanTest {
+
+  /**
+   * Mock reader which writes columns as strings, using
+   * standard conversions to convert to different types as
+   * specified in a provided schema.
+   */
+  private static class MockReader implements ManagedReader {
+
+    private final FixedReceiver receiver;
+
+    public MockReader(SchemaNegotiator negotiator) {
+      Builder builder = FixedReceiver.builderFor(negotiator)
+          .schemaIsComplete();
+      TupleMetadata readerSchema = new SchemaBuilder()
+          .add("ti", MinorType.VARCHAR)
+          .add("si", MinorType.VARCHAR)
+          .add("int", MinorType.VARCHAR)
+          .add("bi", MinorType.VARCHAR)
+          .add("fl", MinorType.VARCHAR)
+          .add("db", MinorType.VARCHAR)
+          .buildSchema();
+      receiver = builder.build(readerSchema);
+    }
+
+    @Override
+    public boolean next() {
+      if (receiver.rowWriter().loader().batchCount() > 0) {
+        return false;
+      }
+      receiver.start();
+      receiver.scalar(0).setString("11");
+      receiver.scalar(1).setString("12");
+      receiver.scalar(2).setString("13");
+      receiver.scalar(3).setString("14");
+      receiver.scalar(4).setString("15.5");
+      receiver.scalar(5).setString("16.25");
+      receiver.save();
+
+      receiver.start();
+      receiver.scalar("ti").setString("127");
+      receiver.scalar("si").setString("32757");
+      receiver.scalar("int").setString(Integer.toString(Integer.MAX_VALUE));
+      receiver.scalar("bi").setString(Long.toString(Long.MAX_VALUE));
+      receiver.scalar("fl").setString("10E6");
+      receiver.scalar("db").setString("10E200");
+      receiver.save();
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * Test the standard string-to-type conversion using an ad-hoc conversion
+   * from the input type (the type used by the row set builder) to the output
+   * (vector) type.
+   */
+  @Test
+  public void testFixedReceiver() {
+
+    // Create the provided and output schemas
+    TupleMetadata outputSchema = new SchemaBuilder()
+        .add("ti", MinorType.TINYINT)
+        .add("si", MinorType.SMALLINT)
+        .add("int", MinorType.INT)
+        .add("bi", MinorType.BIGINT)
+        .add("fl", MinorType.FLOAT4)
+        .add("db", MinorType.FLOAT8)
+        .buildSchema();
+
+    // Create the scan
+    BaseScanFixtureBuilder builder = simpleBuilder(negotiator -> new MockReader(negotiator));
+    builder.builder.providedSchema(outputSchema);
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    // Load test data using converters
+    assertTrue(scan.next());
+
+    // Build the expected vector without a type converter.
+    final SingleRowSet expected = fixture.rowSetBuilder(outputSchema)
+        .addRow(11, 12, 13, 14L, 15.5F, 16.25D)
+        .addRow(127, 32757, Integer.MAX_VALUE, Long.MAX_VALUE, 10E6F, 10E200D)
+        .build();
+
+    // Compare
+    VectorContainer container = scan.batchAccessor().container();
+    RowSetUtilities.verify(expected, fixture.wrap(container));
+    scanFixture.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java
index 2886d26..cba3a79 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanBasics.java
@@ -23,7 +23,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
@@ -38,7 +38,7 @@
  * Tests the basics of the scan operator protocol: error conditions,
  * etc.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanBasics extends BaseScanTest {
   private static final Logger logger = LoggerFactory.getLogger(TestScanBasics.class);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java
index c0fbbfb..b68dc16 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanEarlySchema.java
@@ -21,7 +21,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
 import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -34,7 +34,7 @@
  * Test "early schema" readers: those that can declare a schema at
  * open time.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanEarlySchema extends BaseScanTest {
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java
index cdbc3b1..cfd3d09 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLateSchema.java
@@ -22,7 +22,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
@@ -40,7 +40,7 @@
  * Test "late schema" readers: those like JSON that discover their schema
  * as they read data.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanLateSchema extends BaseScanTest {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
index 9d94e9f..7b14c41 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOuputSchema.java
@@ -20,7 +20,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
@@ -43,7 +43,7 @@
  * defines the schema to be output from the scan operator, and forces
  * conversions between reader and output data types.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanOuputSchema extends BaseScanTest {
 
   private static class MockSimpleReader implements ManagedReader {
@@ -78,6 +78,7 @@
     private void buildWriters(TupleMetadata providedSchema,
         TupleMetadata schema) {
       RowSetLoader rowWriter = tableLoader.writer();
+      StandardConversions conversions = StandardConversions.builder().build();
       for (int i = 0; i < schema.size(); i++) {
         ColumnMetadata colSchema = schema.metadata(i);
         String colName = colSchema.name();
@@ -92,7 +93,7 @@
           writers.add(colSchema.name(), rowWriter.scalar(colIndex));
         } else {
           writers.add(colSchema.name(),
-              StandardConversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
+              conversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
         }
       }
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java
index efdcbbf..d813ec9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanOverflow.java
@@ -23,7 +23,7 @@
 
 import java.util.Arrays;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
@@ -35,7 +35,7 @@
 /**
  * Test vector overflow in the context of the scan operator.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanOverflow extends BaseScanTest {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java
index ad3dc01..e35fc39 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/MockFileNames.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.impl.scan.v3.file;
 
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver;
 import org.apache.hadoop.fs.Path;
 
 public interface MockFileNames {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java
index 24563c1..0dfdd9e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileDescrip.java
@@ -18,45 +18,41 @@
 package org.apache.drill.exec.physical.impl.scan.v3.file;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import java.io.IOException;
+
+import org.apache.drill.categories.EvfTest;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
 import org.apache.drill.test.BaseTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestFileDescrip extends BaseTest {
 
-  /**
-   * Degenerate case: no file or root
-   */
-  @Test
-  public void testEmpty() {
-    FileDescrip fd = new FileDescrip(null, null);
-    assertFalse(fd.isSet());
-    assertNull(fd.filePath());
-    assertEquals(0, fd.dirPathLength());
-    assertNull(fd.partition(0));
+  private static final DrillFileSystem dfs;
+
+  static {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+    try {
+      dfs = new DrillFileSystem(conf);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
   }
 
-  /**
-   * Degenerate case: no file path, but with as selection root
-   * Should never occur in practice.
-   */
-  @Test
-  public void testNoPath() {
-    Path root = new Path("hdfs://a/b");
-    FileDescrip fd = new FileDescrip(null, root);
-    assertFalse(fd.isSet());
-    assertNull(fd.filePath());
-    assertEquals(0, fd.dirPathLength());
-    assertNull(fd.partition(0));
+  private FileDescrip fileDescrip(Path input, Path root) {
+    return new FileDescrip(dfs,
+        new FileWorkImpl(0, 1000, input),
+        root);
   }
 
   /**
@@ -65,9 +61,8 @@
    */
   @Test
   public void testNoRoot() {
-    Path input = new Path("hdfs://foo.csv");
-    FileDescrip fd = new FileDescrip(input, null);
-    assertTrue(fd.isSet());
+    Path input = new Path("file:///foo.csv");
+    FileDescrip fd = fileDescrip(input, null);
     assertSame(input, fd.filePath());
     assertEquals(0, fd.dirPathLength());
     assertNull(fd.partition(0));
@@ -78,9 +73,8 @@
    */
   @Test
   public void testSingleFile() {
-    Path input = new Path("hdfs://a/b/c/foo.csv");
-    FileDescrip fd = new FileDescrip(input, null);
-    assertTrue(fd.isSet());
+    Path input = new Path("file:///a/b/c/foo.csv");
+    FileDescrip fd = fileDescrip(input, null);
     assertSame(input, fd.filePath());
     assertEquals(0, fd.dirPathLength());
     assertNull(fd.partition(0));
@@ -91,10 +85,9 @@
    */
   @Test
   public void testRootFile() {
-    Path root = new Path("hdfs://a/b");
-    Path input = new Path("hdfs://a/b/foo.csv");
-    FileDescrip fd = new FileDescrip(input, root);
-    assertTrue(fd.isSet());
+    Path root = new Path("file:///a/b");
+    Path input = new Path("file:///a/b/foo.csv");
+    FileDescrip fd = fileDescrip(input, root);
     assertSame(input, fd.filePath());
     assertEquals(0, fd.dirPathLength());
     assertNull(fd.partition(0));
@@ -105,10 +98,9 @@
    */
   @Test
   public void testBelowRoot() {
-    Path root = new Path("hdfs://a/b");
-    Path input = new Path("hdfs://a/b/c/foo.csv");
-    FileDescrip fd = new FileDescrip(input, root);
-    assertTrue(fd.isSet());
+    Path root = new Path("file:///a/b");
+    Path input = new Path("file:///a/b/c/foo.csv");
+    FileDescrip fd = fileDescrip(input, root);
     assertSame(input, fd.filePath());
     assertEquals(1, fd.dirPathLength());
     assertEquals("c", fd.partition(0));
@@ -121,10 +113,10 @@
    */
   @Test
   public void testAboveRoot() {
-    Path root = new Path("hdfs://a/b");
-    Path input = new Path("hdfs://a/foo.csv");
+    Path root = new Path("file:///a/b");
+    Path input = new Path("file:///a/foo.csv");
     try {
-      new FileDescrip(input, root);
+      fileDescrip(input, root);
       fail();
     } catch (IllegalArgumentException e) {
       // Expected
@@ -137,10 +129,10 @@
    */
   @Test
   public void testDisjointPath() {
-    Path root = new Path("hdfs://a/b");
-    Path input = new Path("hdfs://d/foo.csv");
+    Path root = new Path("file:///a/b");
+    Path input = new Path("file:///d/foo.csv");
     try {
-      new FileDescrip(input, root);
+      fileDescrip(input, root);
       fail();
     } catch (IllegalArgumentException e) {
       // Expected
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java
index 1f45f6b..9f7d4c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScan.java
@@ -21,7 +21,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -43,7 +43,7 @@
  * Focuses on the file implicit columns, assumes that other tests have
  * verified the underlying mechanisms.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestFileScan extends BaseFileScanTest {
 
   /**
@@ -57,8 +57,9 @@
     public MockLateSchemaReader(FileSchemaNegotiator negotiator) {
 
       // Verify the path
-      assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.fileWork().getPath().toString());
-      assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.split().getPath().toString());
+      assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.file().filePath().toString());
+      assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.file().fileWork().getPath().toString());
+      assertEquals(MOCK_FILE_SYSTEM_NAME, negotiator.file().split().getPath().toString());
 
       // No schema or file, just build the table loader.
       tableLoader = negotiator.build();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java
index 2b6f0cf..00931db 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestFileScanLifecycle.java
@@ -25,7 +25,7 @@
 
 import java.util.Collections;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
@@ -43,7 +43,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestFileScanLifecycle extends BaseTestScanLifecycle implements MockFileNames {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java
index 4b64ef5..e064a20 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnLoader.java
@@ -20,13 +20,14 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.io.IOException;
 import java.util.List;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.StaticBatchBuilder;
-import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.BaseTestScanLifecycle.DummySubScan;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaConfigBuilder;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker;
 import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
@@ -35,8 +36,13 @@
 import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -45,19 +51,51 @@
  * Tests the file implicit column handler which identifies implicit columns
  * and populates them. Assumes that the implicit column parser tests pass.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestImplicitColumnLoader extends SubOperatorTest implements MockFileNames {
 
+  private static final DrillFileSystem dfs;
+
+  static {
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+    try {
+      dfs = new DrillFileSystem(conf);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static class ImplicitFixture {
+    final FileScanLifecycleBuilder options;
+    final ScanSchemaTracker schemaTracker;
+    FileDescrip fileDescrip;
+    ImplicitFileColumnsHandler handler;
+
+    public ImplicitFixture(List<SchemaPath> projList, Path root) {
+      this.options = new FileScanLifecycleBuilder();
+      options.rootDir(root);
+      this.schemaTracker = new ScanSchemaConfigBuilder()
+          .projection(projList)
+          .build();
+    }
+
+    public void build(Path input) {
+      final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+      handler = new ImplicitFileColumnsHandler(
+          dfs, fixture.getOptionManager(), options, cache, schemaTracker);
+      FileWork fileWork = new FileWorkImpl(0, 1000, input);
+      fileDescrip = handler.makeDescrip(fileWork);
+    }
+
+    public StaticBatchBuilder batchBuilder() {
+      return handler.forFile(fileDescrip);
+    }
+  }
   public StaticBatchBuilder buildHandler(List<SchemaPath> projList, Path root, Path input) {
-    FileScanLifecycleBuilder options = new FileScanLifecycleBuilder();
-    options.rootDir(root);
-    final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
-    final ScanSchemaTracker schemaTracker = new ScanSchemaConfigBuilder()
-        .projection(projList)
-        .build();
-    ImplicitFileColumnsHandler handler = new ImplicitFileColumnsHandler(
-        fixture.operatorContext(new DummySubScan()), options, cache, schemaTracker);
-    return handler.forFile(input);
+    ImplicitFixture fixture = new ImplicitFixture(projList, root);
+    fixture.build(input);
+    return fixture.batchBuilder();
   }
 
   @Test
@@ -84,9 +122,9 @@
   }
 
   @Test
-  public void testAllColumns() {
+  public void testNonInternalColumns() {
     StaticBatchBuilder batchLoader = buildHandler(
-        ScanTestUtils.projectAllWithMetadata(3),
+        ScanTestUtils.projectAllWithFileImplicit(3),
         MOCK_ROOT_PATH, MOCK_FILE_PATH);
     assertNotNull(batchLoader);
     batchLoader.load(2);
@@ -106,4 +144,59 @@
         .build();
     RowSetUtilities.verify(expected, fixture.wrap(batchLoader.outputContainer()));
   }
+
+  @Test
+  public void testInternalColumns() {
+    ImplicitFixture testFixture = new ImplicitFixture(
+        RowSetTestUtils.projectList(
+            ScanTestUtils.LAST_MODIFIED_TIME_COL,
+            ScanTestUtils.PROJECT_METADATA_COL,
+            ScanTestUtils.ROW_GROUP_INDEX_COL,
+            ScanTestUtils.ROW_GROUP_START_COL,
+            ScanTestUtils.ROW_GROUP_LENGTH_COL),
+        MOCK_ROOT_PATH);
+    testFixture.build(MOCK_FILE_PATH);
+    testFixture.fileDescrip.setRowGroupAttribs(10, 10_000, 5_000);
+    testFixture.fileDescrip.setModTime("123456789");
+    StaticBatchBuilder batchLoader = testFixture.batchBuilder();
+    assertNotNull(batchLoader);
+    batchLoader.load(2);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add(ScanTestUtils.LAST_MODIFIED_TIME_COL, MinorType.VARCHAR)
+        .addNullable(ScanTestUtils.PROJECT_METADATA_COL, MinorType.VARCHAR)
+        .add(ScanTestUtils.ROW_GROUP_INDEX_COL, MinorType.VARCHAR)
+        .add(ScanTestUtils.ROW_GROUP_START_COL, MinorType.VARCHAR)
+        .add(ScanTestUtils.ROW_GROUP_LENGTH_COL, MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("123456789", null, "10", "10000", "5000")
+        .addRow("123456789", null, "10", "10000", "5000")
+        .build();
+    RowSetUtilities.verify(expected, fixture.wrap(batchLoader.outputContainer()));
+  }
+
+  @Test
+  public void testInternalEmptyFile() {
+    ImplicitFixture testFixture = new ImplicitFixture(
+        RowSetTestUtils.projectList(
+            ScanTestUtils.LAST_MODIFIED_TIME_COL,
+            ScanTestUtils.PROJECT_METADATA_COL),
+        MOCK_ROOT_PATH);
+    testFixture.build(MOCK_FILE_PATH);
+    testFixture.fileDescrip.setModTime("123456789");
+    testFixture.fileDescrip.markEmpty();
+    StaticBatchBuilder batchLoader = testFixture.batchBuilder();
+    assertNotNull(batchLoader);
+    batchLoader.load(1);
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+        .add(ScanTestUtils.LAST_MODIFIED_TIME_COL, MinorType.VARCHAR)
+        .addNullable(ScanTestUtils.PROJECT_METADATA_COL, MinorType.VARCHAR)
+        .build();
+    RowSet expected = fixture.rowSetBuilder(expectedSchema)
+        .addRow("123456789", "false")
+        .build();
+    RowSetUtilities.verify(expected, fixture.wrap(batchLoader.outputContainer()));
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java
index a87e330..f10083d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/file/TestImplicitColumnResolver.java
@@ -22,21 +22,20 @@
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ImplicitColumnOptions;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ParseResult;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.MutableTupleSchema.ColumnHandle;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ImplicitColumnOptions;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ParseResult;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectionSchemaTracker;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
@@ -49,11 +48,14 @@
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.test.SubOperatorTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestImplicitColumnResolver extends SubOperatorTest {
 
   private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
@@ -77,7 +79,7 @@
   }
 
   private boolean isImplicit(List<ColumnHandle> cols, int index) {
-    return SchemaUtils.isImplicit(cols.get(index).column());
+    return cols.get(index).isImplicit();
   }
 
   @Test
@@ -87,6 +89,7 @@
     ParseResult result = parseFixture.parseImplicit();
     assertTrue(result.columns().isEmpty());
     assertTrue(result.schema().isEmpty());
+    assertFalse(result.isMetadataScan());
   }
 
   /**
@@ -103,6 +106,7 @@
             ScanTestUtils.SUFFIX_COL));
     ParseResult result = parseFixture.parseImplicit();
 
+    assertFalse(result.isMetadataScan());
     assertEquals(4, result.columns().size());
 
     TupleMetadata expected = new SchemaBuilder()
@@ -132,6 +136,7 @@
     ParserFixture parseFixture = new ParserFixture(
         RowSetTestUtils.projectList(dir2, dir1, dir0, "a"));
     ParseResult result = parseFixture.parseImplicit();
+    assertFalse(result.isMetadataScan());
     assertEquals(3, result.columns().size());
 
     TupleMetadata expected = new SchemaBuilder()
@@ -156,6 +161,7 @@
         .maxPartitionDepth(3)
         .useLegacyWildcardExpansion(true);
     ParseResult result = parseFixture.parseImplicit();
+    assertFalse(result.isMetadataScan());
     assertEquals(3, result.columns().size());
 
     TupleMetadata expected = new SchemaBuilder()
@@ -535,12 +541,6 @@
     assertTrue(schemaTracker.isResolved());
     assertSame(ProjectionType.SOME, schemaTracker.projectionType());
 
-    // Implicit columns should be marked
-    MutableTupleSchema internalSchema = schemaTracker.internalSchema();
-    assertFalse(internalSchema.find("a").isImplicit());
-    assertTrue(internalSchema.find(ScanTestUtils.FILE_NAME_COL).isImplicit());
-    assertTrue(internalSchema.find(ScanTestUtils.partitionColName(2)).isImplicit());
-
     ImplicitColumnOptions options = new ImplicitColumnOptions()
         .optionSet(fixture.getOptionManager());
     ImplicitColumnResolver parser = new ImplicitColumnResolver(options, ERROR_CONTEXT);
@@ -552,4 +552,62 @@
 
     assertEquals(definedSchema, schemaTracker.outputSchema());
   }
+
+  /**
+   * Test including internal implicit columns in the project list.
+   * @throws IOException
+   */
+  @Test
+  public void testInternalImplicitColumnSelection() throws IOException {
+    // Simulate SELECT lmt, $project_metadata$", rgi, rgs, rgl ...
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
+    DrillFileSystem dfs = new DrillFileSystem(conf);
+    ParserFixture parseFixture = new ParserFixture(
+        RowSetTestUtils.projectList("a",
+            ScanTestUtils.LAST_MODIFIED_TIME_COL,
+            ScanTestUtils.PROJECT_METADATA_COL,
+            ScanTestUtils.ROW_GROUP_INDEX_COL,
+            ScanTestUtils.ROW_GROUP_START_COL,
+            ScanTestUtils.ROW_GROUP_LENGTH_COL));
+    parseFixture.options.dfs(dfs);
+    ParseResult result = parseFixture.parseImplicit();
+
+    assertTrue(result.isMetadataScan());
+    assertEquals(5, result.columns().size());
+
+    TupleMetadata expected = new SchemaBuilder()
+        .add(ScanTestUtils.LAST_MODIFIED_TIME_COL, MinorType.VARCHAR)
+        .addNullable(ScanTestUtils.PROJECT_METADATA_COL, MinorType.VARCHAR)
+        .add(ScanTestUtils.ROW_GROUP_INDEX_COL, MinorType.VARCHAR)
+        .add(ScanTestUtils.ROW_GROUP_START_COL, MinorType.VARCHAR)
+        .add(ScanTestUtils.ROW_GROUP_LENGTH_COL, MinorType.VARCHAR)
+        .build();
+    assertEquals(expected, result.schema());
+
+    List<ColumnHandle> cols = parseFixture.tracker.internalSchema().columns();
+    assertFalse(isImplicit(cols, 0));
+    assertTrue(isImplicit(cols, 1));
+    assertTrue(isImplicit(cols, 2));
+    assertTrue(isImplicit(cols, 3));
+    assertTrue(isImplicit(cols, 4));
+    assertTrue(isImplicit(cols, 5));
+  }
+
+  @Test
+  public void testProvidedImplicitColInternal() {
+    TupleMetadata providedSchema = new SchemaBuilder()
+        .add("myLmt", MinorType.INT)
+        .build();
+    SchemaUtils.markImplicit(providedSchema.metadata("myLmt"), ScanTestUtils.LAST_MODIFIED_TIME_COL);
+
+    ParserFixture parseFixture = new ParserFixture(
+        RowSetTestUtils.projectAll());
+    parseFixture.tracker.applyProvidedSchema(providedSchema);
+    try {
+      parseFixture.parseImplicit();
+    } catch (UserException e) {
+      assertTrue(e.getMessage().contains("references an undefined implicit column type"));
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
index 5468175..66f10fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
@@ -135,7 +135,6 @@
     public void close() {}
   }
 
-
   /**
    * Mock reader with no data or schema, indicated by an early EOF
    * exception.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java
index 7e12260..b6becd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestMissingColumnLoader.java
@@ -21,7 +21,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -49,7 +49,7 @@
  * can create the classic nullable Int null column, or one of
  * any other type and mode.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestMissingColumnLoader extends SubOperatorTest {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
index 0ec3196..ff83b1d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestOutputBatchBuilder.java
@@ -22,7 +22,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.OutputBatchBuilder.BatchSource;
 import org.apache.drill.exec.physical.rowSet.RowSet;
@@ -36,7 +36,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestOutputBatchBuilder extends SubOperatorTest {
 
   public TupleMetadata firstSchema() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java
index 1500295..7809904 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestReaderErrors.java
@@ -22,7 +22,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
 import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
@@ -35,7 +35,7 @@
  * Verifies proper handling of errors from a reader, including use of the
  * scan and reader error contexts.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestReaderErrors extends BaseTestScanLifecycle {
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java
index 75700c1..24a083d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleBasics.java
@@ -23,7 +23,7 @@
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
@@ -34,13 +34,14 @@
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.rowSet.RowSetUtilities;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanLifecycleBasics extends BaseTestScanLifecycle {
 
   /**
@@ -122,11 +123,17 @@
     // Early schema: so output schema is available after open
     assertTrue(scan.hasOutputSchema());
     assertEquals(SCHEMA, scan.outputSchema());
-    assertFalse(reader.next());
-    reader.close();
+    assertTrue(reader.next());
+    VectorContainer result = reader.output();
+    assertEquals(0, result.getRecordCount());
+    result.zeroVectors();
 
     // Early schema with no additional columns discovered
     assertEquals(SCHEMA, scan.outputSchema());
+
+    // But, no second batch.
+    assertFalse(reader.next());
+    reader.close();
     scan.close();
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java
index 3e3b833..334753d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleSchema.java
@@ -23,7 +23,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
 import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
@@ -37,7 +37,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanLifecycleSchema extends BaseTestScanLifecycle {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java
index 9815b94..e9c571d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleTwoReaders.java
@@ -21,7 +21,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
 import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
@@ -37,7 +37,7 @@
  * Test two readers in succession in various cases: empty readers, normal readers,
  * type conflicts, etc.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanLifecycleTwoReaders extends BaseTestScanLifecycle {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java
index 4dcc4ec..b7fa328 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestDynamicSchemaFilter.java
@@ -24,7 +24,7 @@
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -39,7 +39,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestDynamicSchemaFilter {
 
   private static final ColumnMetadata A_COL =
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java
index 491be8b..57277b5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectedPath.java
@@ -20,7 +20,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
@@ -37,7 +37,7 @@
  * to see if the projection path is consistent with the type. Tests here
  * verify the consistency checks.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestProjectedPath {
 
   // INT is a proxy for all scalar columns.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java
index 02d05a1..d67d7e6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestProjectionParser.java
@@ -27,7 +27,7 @@
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanProjectionParser.ProjectionParseResult;
 import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
@@ -46,7 +46,7 @@
  * parsing; the only bits not tested here is that which is
  * inherently specific to some use case.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestProjectionParser extends BaseTest {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java
index 6ccae71..075e774 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTracker.java
@@ -23,10 +23,11 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
@@ -43,7 +44,7 @@
  * Test the scan operator schema tracker which computes the final
  * output schema from a variety of sources.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanSchemaTracker extends BaseTest {
   private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
 
@@ -294,6 +295,38 @@
     assertEquals(expected, outputSchema);
   }
 
+  /**
+   * Test for a query of the form:<br>
+   * {code SELECT * FROM t ORDER BY a}<br>
+   * in which we get a projection list of the form<br<
+   * {@code [`**`, `a`]<br>
+   * If we are given a provided schema of {@code (a, b, c)},
+   * the "natural" expansion will be @{code (b, c, a)}, but we
+   * add a hack to get what the user expects: @{code (a, b, c)}.
+   * The "natural" expansion occurs because the projection list says
+   * "all all columns to the wildcard position except those mentioned
+   * elsewhere". We essentially redefine it as "add or move all columns
+   * in provided schema order."
+   */
+  @Test
+  public void testWildcardWithExplicitWithProvided() {
+
+    // Simulate SELECT *, a ...
+    final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+        .projection(RowSetTestUtils.projectList(
+            SchemaPath.DYNAMIC_STAR, "a"));
+
+    final TupleMetadata providedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.BIGINT)
+        .add("c", MinorType.VARCHAR)
+        .buildSchema();
+    builder.providedSchema(providedSchema);
+    final ScanSchemaTracker schemaTracker = builder.build();
+    assertTrue(schemaTracker.isResolved());
+    assertEquals(providedSchema, schemaTracker.outputSchema());
+  }
+
   @Test
   public void testStrictProvidedSchemaWithWildcard() {
 
@@ -467,4 +500,57 @@
     final TupleMetadata outputSchema = schemaTracker.outputSchema();
     assertTrue(outputSchema.isEmpty());
   }
+
+  /**
+   * Test for a query of the form:<br>
+   * {code SELECT * FROM t ORDER BY a}<br>
+   * in which we get a projection list of the form<br<
+   * {@code [`**`, `a`]<br>
+   * If we are given a reader schema of {@code (a, b, c)},
+   * the "natural" expansion will be @{code (b, c, a)}, but we
+   * add a hack to get what the user expects: @{code (a, b, c)}.
+   * The "natural" expansion occurs because the projection list says
+   * "all all columns to the wildcard position except those mentioned
+   * elsewhere". We essentially redefine it as "add or move all columns
+   * in provided schema order."
+   */
+  @Test
+  public void testWildcardWithExplicitWithReaderSchema() {
+
+    // Simulate SELECT *, a ...
+    final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+        .projection(RowSetTestUtils.projectList(
+            SchemaPath.DYNAMIC_STAR, "a"));
+
+    final TupleMetadata readerOutputSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.BIGINT)
+        .add("c", MinorType.VARCHAR)
+        .buildSchema();
+    final ScanSchemaTracker schemaTracker = builder.build();
+    schemaTracker.applyReaderSchema(readerOutputSchema, ERROR_CONTEXT);
+    assertTrue(schemaTracker.isResolved());
+    assertEquals(readerOutputSchema, schemaTracker.outputSchema());
+  }
+
+  @Test
+  public void testWildcardWithExplicitWithProvidedAndReaderSchema() {
+
+    // Simulate SELECT *, a ...
+    final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+        .projection(RowSetTestUtils.projectList(
+            SchemaPath.DYNAMIC_STAR, "a"));
+
+    final TupleMetadata providedSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.BIGINT)
+        .add("c", MinorType.VARCHAR)
+        .buildSchema();
+    builder.providedSchema(providedSchema);
+    final ScanSchemaTracker schemaTracker = builder.build();
+    schemaTracker.applyReaderSchema(providedSchema, ERROR_CONTEXT);
+    assertTrue(schemaTracker.isResolved());
+    assertEquals(providedSchema, schemaTracker.outputSchema());
+  }
+
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java
index c7e4851..dda57c0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestScanSchemaTrackerMaps.java
@@ -23,7 +23,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -44,7 +44,7 @@
  * defined schema, scan schema, reader schema and missing columns schemas
  * must all be trees, and must all be kept in sync.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestScanSchemaTrackerMaps extends BaseTest {
   private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java
index 76aba7f..fc343bd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerDefined.java
@@ -23,7 +23,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
@@ -38,7 +38,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestSchemaTrackerDefined {
   private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java
index af74205..5dab1ad 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerEarlyReaderSchema.java
@@ -22,13 +22,15 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
-import org.apache.drill.exec.physical.impl.scan.v3.schema.ImplicitColumnResolver.ImplicitColumnOptions;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver;
+import org.apache.drill.exec.physical.impl.scan.v3.file.ImplicitColumnResolver.ImplicitColumnOptions;
 import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -41,7 +43,7 @@
  * Basic tests of early reader against a project list are in
  * {@link TestSchemaTrackerInputSchema}.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestSchemaTrackerEarlyReaderSchema extends SubOperatorTest {
   protected static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
   protected static final TupleMetadata SCHEMA = BaseTestSchemaTracker.SCHEMA;
@@ -169,4 +171,23 @@
       assertTrue(e.getMessage().contains("Reader column: `a` INT"));
     }
   }
+
+  @Test
+  public void testWildcardWithExplicitWithEarlyReaderSchema() {
+
+    // Simulate SELECT *, a ...
+    final ScanSchemaConfigBuilder builder = new ScanSchemaConfigBuilder()
+        .projection(RowSetTestUtils.projectList(
+            SchemaPath.DYNAMIC_STAR, "a"));
+
+    final TupleMetadata readerSchema = new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .add("b", MinorType.BIGINT)
+        .add("c", MinorType.VARCHAR)
+        .buildSchema();
+    final ScanSchemaTracker schemaTracker = builder.build();
+    schemaTracker.applyEarlyReaderSchema(readerSchema);
+    assertTrue(schemaTracker.isResolved());
+    assertEquals(readerSchema, schemaTracker.outputSchema());
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java
index 40e860f..2113d01 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerInputSchema.java
@@ -27,7 +27,7 @@
 import java.util.Collections;
 import java.util.function.Consumer;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -47,7 +47,7 @@
  * before any data is seen. There are subtle differences explored
  * here and in {@link TestSchemaTrackerEarlyReaderSchema}.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestSchemaTrackerInputSchema extends BaseTestSchemaTracker {
 
   private void testBoth(Collection<SchemaPath> projList, TupleMetadata schema,
@@ -175,25 +175,6 @@
     });
   }
 
-  /**
-   * Verify a reasonable error if the name of an implied implicit
-   * column (one that appears with a wildcard) conflicts with a
-   * provided column name. We cannot project two columns with that
-   * same name. Does not occur for a reader schema.
-   */
-  @Test
-  public void testWithWildcardImplicitConflict() {
-    ProjectionSchemaTracker tracker = trackerFor(
-        RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR, "y"));
-    try {
-      tracker.applyProvidedSchema(SCHEMA);
-      fail();
-    } catch (UserException e) {
-      assertTrue(e.getMessage().contains("implicit"));
-      assertTrue(e.getMessage().contains("Column: a"));
-    }
-  }
-
   @Test
   public void testProvidedMapProjectConflict() {
     ProjectionSchemaTracker tracker = trackerFor(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java
index 9587e9c..ffb1854 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/schema/TestSchemaTrackerProjection.java
@@ -19,13 +19,14 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.Collections;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.expression.SchemaPath;
@@ -41,7 +42,7 @@
  * Test the first step of scan schema resolution: translating from the
  * projection parser to a dynamic schema ready for resolution.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestSchemaTrackerProjection extends BaseTest {
   private static final CustomErrorContext ERROR_CONTEXT = EmptyErrorContext.INSTANCE;
 
@@ -55,7 +56,7 @@
     ProjectionSchemaTracker tracker = schemaTracker(
         Collections.emptyList());
     assertTrue(tracker.isResolved());
-    assertEquals(0, tracker.schemaVersion());
+    assertEquals(1, tracker.schemaVersion());
     assertSame(ScanSchemaTracker.ProjectionType.NONE, tracker.projectionType());
     assertTrue(tracker.internalSchema().toSchema().isEmpty());
     ProjectionFilter filter = tracker.projectionFilter(ERROR_CONTEXT);
@@ -67,7 +68,7 @@
     ProjectionSchemaTracker tracker = schemaTracker(
         RowSetTestUtils.projectAll());
     assertFalse(tracker.isResolved());
-    assertEquals(0, tracker.schemaVersion());
+    assertEquals(1, tracker.schemaVersion());
     assertSame(ScanSchemaTracker.ProjectionType.ALL, tracker.projectionType());
     assertTrue(tracker.internalSchema().toSchema().isEmpty());
     ProjectionFilter filter = tracker.projectionFilter(ERROR_CONTEXT);
@@ -81,6 +82,8 @@
     assertFalse(tracker.isResolved());
     assertTrue(0 < tracker.schemaVersion());
     assertSame(ScanSchemaTracker.ProjectionType.ALL, tracker.projectionType());
+    assertNotNull(tracker.columnProjection("a"));
+    assertNotNull(tracker.columnProjection("b"));
     TupleMetadata schema = tracker.internalSchema().toSchema();
     assertEquals(2, schema.size());
     assertTrue(schema.metadata(0).isDynamic());
@@ -100,4 +103,21 @@
     ProjectionFilter filter = tracker.projectionFilter(ERROR_CONTEXT);
     assertTrue(filter instanceof DynamicSchemaFilter);
   }
+
+  @Test
+  public void testExplicitArray() {
+    ProjectionSchemaTracker tracker = schemaTracker(
+        RowSetTestUtils.projectList("a[1]", "a[3]"));
+    assertSame(ScanSchemaTracker.ProjectionType.SOME, tracker.projectionType());
+
+    ProjectedColumn projCol = tracker.columnProjection("a");
+    assertNotNull(projCol);
+    boolean[] indexes = projCol.indexes();
+    assertNotNull(indexes);
+    assertEquals(4, indexes.length);
+    assertFalse(indexes[0]);
+    assertTrue(indexes[1]);
+    assertFalse(indexes[2]);
+    assertTrue(indexes[3]);
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
index 1bee697..1c8ed95 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProjection.java
@@ -30,7 +30,7 @@
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -61,7 +61,7 @@
 /**
  * Test of the basics of the projection mechanism.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestResultSetLoaderProjection extends SubOperatorTest {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
index e4ce405..d4ae1ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvHeader.java
@@ -23,7 +23,7 @@
 import java.nio.file.Paths;
 import java.util.List;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.TestBuilder;
@@ -31,7 +31,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestCsvHeader extends BaseTestQuery{
 
   private static final String ROOT = "store/text/data/cars.csvh";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
index 0b3c7d6..80c5ca4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvIgnoreHeaders.java
@@ -22,7 +22,7 @@
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -33,7 +33,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestCsvIgnoreHeaders extends BaseCsvTest{
 
   private static String withHeaders[] = {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
index 57ae662..9912ca4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvTableProperties.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.store.easy.text.compliant;
 
 import org.apache.drill.TestSelectWithOption;
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -53,7 +53,7 @@
  * @see TestSelectWithOption for similar tests using table
  * properties within SQL
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestCsvTableProperties extends BaseCsvTest {
 
   @BeforeClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 03a98ac..a0bfbbf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -23,7 +23,7 @@
 import java.io.PrintWriter;
 import java.util.Iterator;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
@@ -60,7 +60,7 @@
  *
  * @see TestHeaderBuilder
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestCsvWithHeaders extends BaseCsvTest {
 
   private static final String TEST_FILE_NAME = "basic.csv";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index 0458aee..ac27d2f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -24,7 +24,7 @@
 
 import java.util.Iterator;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -47,7 +47,7 @@
  * The focus here is on the schema mechanism, which can be explored
  * with simple tables of just a few rows.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestCsvWithSchema extends BaseCsvTest {
 
   protected static final String FILE1_NAME = "file1.csv";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index 872574c..c724a71 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -26,7 +26,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.TestEmptyInputSql;
@@ -47,7 +47,7 @@
  * and without an external schema. Data is represented with the
  * `columns` array column.
  */
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestCsvWithoutHeaders extends BaseCsvTest {
 
   private static final String TEST_FILE_NAME = "simple.csv";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
index 76c9d5d..ab4ee01 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
@@ -25,7 +25,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.drill.categories.EvfTests;
+import org.apache.drill.categories.EvfTest;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -54,7 +54,7 @@
  * current "V3" version. The tests here verify this behavior.
  */
 
-@Category(EvfTests.class)
+@Category(EvfTest.class)
 public class TestPartitionRace extends BaseCsvTest {
 
   @BeforeClass
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index e451dbb..0be54f3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -329,12 +329,11 @@
    * Run the query and return the first non-empty batch as a
    * {@link DirectRowSet} object that can be inspected directly
    * by the code using a {@link RowSetReader}.
-   * <p>
    *
    * @see #rowSetIterator() for a version that reads a series of
    * batches as row sets.
    * @return a row set that represents the first non-empty batch returned from
-   * the query
+   * the query, or {@code null} if the query returns no data (no batches)
    * @throws RpcException if anything goes wrong
    */
   public DirectRowSet rowSet() throws RpcException {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index 9aee7eb..b21e22a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -241,6 +241,10 @@
     return new PrimitiveColumnMetadata(field);
   }
 
+  public static boolean isScalar(ColumnMetadata col) {
+    return isScalar(col.type());
+  }
+
   public static boolean isScalar(MinorType type) {
     return !isComplex(type);
   }