DRILL-7436: Fix record count, vector structure issues in several operators

Adds additional vector checks to the BatchValidator.

Enables checking for the following operators:

* FilterRecordBatch
* PartitionLimitRecordBatch
* UnnestRecordBatch
* HashAggBatch
* RemovingRecordBatch

Fixes vector count issues for each of these.

Fixes empty-batch (record count = 0) handling in several of the
above operators. Added a method to VectorContainer to correctly
create an empty batch. (An empty batch, counter-intuitively,
needs vectors allocated to hold the 0 value in the first
position of each offset vector.)

Disables verbose logging for MongoDB tests. Details are written to
the log rather than the console.

Disables two invalid Mongo tests. See DRILL-7428.

Adjusts the expression tree materializer to not add the LATE type
to Union vectors. (See DRILL-7435.)

Ensures that Union vectors contain valid vectors for each subtype.
The present fix is a work-around, see DRILL-7434 for a better
long-term fix.

Cleans up code formatting and other minor issues in each file touched
during the fixes in this PR.
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java
index 7485a30..6e69447 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestConstants.java
@@ -77,6 +77,9 @@
   String TEST_BOOLEAN_FILTER_QUERY_TEMPLATE4 = "select `employee_id` from mongo.%s.`%s` where (position_id = 16 and isFTE = true) or last_name = 'Yonce'";
 
   String TEST_STAR_QUERY_UNSHARDED_DB = "select * from mongo.%s.`%s`";
-  String TEST_STAR_QUERY_UNSHARDED_DB_PROJECT_FILTER = "select t.name as name,t.topping.type as type from mongo.%s.`%s` t where t.sales >= 150";
-  String TEST_STAR_QUERY_UNSHARDED_DB_GROUP_PROJECT_FILTER = "select t.topping.type as type,count(t.topping.type) as typeCount from mongo.%s.`%s` t group by t.topping.type order by typeCount";
+  // This query is invalid. See DRILL-7420. Topping is a repeated map.
+  // Drill should not allow projecting a repeated map to the top level; this should
+  // require a flatten or lateral query.
+  String TEST_STAR_QUERY_UNSHARDED_DB_PROJECT_FILTER = "select t.name as name, t.topping.type as type from mongo.%s.`%s` t where t.sales >= 150";
+  String TEST_STAR_QUERY_UNSHARDED_DB_GROUP_PROJECT_FILTER = "select t.topping.type as type, count(t.topping.type) as typeCount from mongo.%s.`%s` t group by t.topping.type order by typeCount";
 }
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
index eed94f1..7910a57 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
@@ -24,9 +24,9 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.categories.SlowTest;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.bson.Document;
 import org.bson.conversions.Bson;
 import org.junit.AfterClass;
@@ -44,6 +44,7 @@
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.IndexOptions;
 import com.mongodb.client.model.Indexes;
+
 import de.flapdoodle.embed.mongo.Command;
 import de.flapdoodle.embed.mongo.MongodExecutable;
 import de.flapdoodle.embed.mongo.MongodProcess;
@@ -212,8 +213,10 @@
           .net(new Net(LOCALHOST, MONGOS_PORT, Network.localhostIsIPv6()))
           .cmdOptions(cmdOptions).build();
 
-      IRuntimeConfig runtimeConfig = new RuntimeConfigBuilder().defaults(
-          Command.MongoD).build();
+      // Configure to write Mongo message to the log. Change this to
+      // defaults() if needed for debugging; will write to the console instead.
+      IRuntimeConfig runtimeConfig = new RuntimeConfigBuilder().defaultsWithLogger(
+          Command.MongoD, logger).build();
       mongodExecutable = MongodStarter.getInstance(runtimeConfig).prepare(
           mongodConfig);
       mongod = mongodExecutable.start();
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index cdec4f7..4b20ebc 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -19,6 +19,7 @@
 
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.categories.SlowTest;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -86,6 +87,7 @@
   }
 
   @Test
+  @Ignore("DRILL-7428") // Query is invalid, Drill bug allows it.
   public void testUnShardedDBInShardedClusterWithProjectionAndFilter() throws Exception {
     testBuilder()
         .sqlQuery(String.format(TEST_STAR_QUERY_UNSHARDED_DB_PROJECT_FILTER, DONUTS_DB, DONUTS_COLLECTION))
@@ -95,6 +97,7 @@
   }
 
   @Test
+  @Ignore("DRILL-7428") // Query is invalid, Drill bug allows it.
   public void testUnShardedDBInShardedClusterWithGroupByProjectionAndFilter() throws Exception {
     testBuilder()
         .sqlQuery(String.format(TEST_STAR_QUERY_UNSHARDED_DB_GROUP_PROJECT_FILTER, DONUTS_DB, DONUTS_COLLECTION))
diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
index 3ef4617..6051fcd 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestTableGenerator.java
@@ -21,18 +21,20 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 
-import de.flapdoodle.embed.mongo.MongoImportProcess;
+import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
-
+import de.flapdoodle.embed.mongo.Command;
 import de.flapdoodle.embed.mongo.MongoImportExecutable;
+import de.flapdoodle.embed.mongo.MongoImportProcess;
 import de.flapdoodle.embed.mongo.MongoImportStarter;
 import de.flapdoodle.embed.mongo.config.IMongoImportConfig;
 import de.flapdoodle.embed.mongo.config.MongoImportConfigBuilder;
 import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.config.RuntimeConfigBuilder;
 import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.config.IRuntimeConfig;
 import de.flapdoodle.embed.process.runtime.Network;
 
 public class TestTableGenerator implements MongoTestConstants {
@@ -56,8 +58,15 @@
         .net(new Net(MONGOS_PORT, Network.localhostIsIPv6())).db(dbName)
         .collection(collection).upsert(upsert).dropCollection(drop)
         .jsonArray(jsonArray).importFile(jsonFile).build();
+    // Configure to write Mongo message to the log. Change this to
+    // .getDefaultInstance() if needed for debugging; will write to
+    // the console instead.
+    IRuntimeConfig rtConfig = new RuntimeConfigBuilder()
+        .defaultsWithLogger(Command.MongoImport, logger)
+        .daemonProcess(false)
+        .build();
     MongoImportExecutable importExecutable = MongoImportStarter
-        .getDefaultInstance().prepare(mongoImportConfig);
+        .getInstance(rtConfig).prepare(mongoImportConfig);
     MongoImportProcess importProcess = importExecutable.start();
 
     // import is in a separate process, we should wait until the process exit
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 1f6ff07..946f3d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -26,11 +26,6 @@
 import java.util.Optional;
 import java.util.Queue;
 
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.metastore.util.SchemaPathUtils;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.BooleanOperator;
 import org.apache.drill.common.expression.CastExpression;
@@ -82,40 +77,52 @@
 import org.apache.drill.exec.expr.fn.FunctionLookupContext;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.resolver.FunctionResolver;
 import org.apache.drill.exec.resolver.FunctionResolverFactory;
 import org.apache.drill.exec.resolver.TypeCastRules;
-
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.metastore.util.SchemaPathUtils;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ExpressionTreeMaterializer {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
+  static final Logger logger = LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
 
-  private ExpressionTreeMaterializer() {
-  }
+  private ExpressionTreeMaterializer() { }
 
-  public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext) {
+  public static LogicalExpression materialize(LogicalExpression expr,
+      VectorAccessible batch, ErrorCollector errorCollector,
+      FunctionLookupContext functionLookupContext) {
     return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, functionLookupContext, false, false);
   }
 
-  public static LogicalExpression materializeAndCheckErrors(LogicalExpression expr, VectorAccessible batch, FunctionLookupContext functionLookupContext) throws SchemaChangeException {
+  public static LogicalExpression materializeAndCheckErrors(LogicalExpression expr,
+      VectorAccessible batch, FunctionLookupContext functionLookupContext)
+          throws SchemaChangeException {
     ErrorCollector collector = new ErrorCollectorImpl();
     LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, collector, functionLookupContext, false, false);
     if (collector.hasErrors()) {
-      throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+      throw new SchemaChangeException(String.format(
+          "Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
     }
     return e;
   }
 
-  public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext,
-                                              boolean allowComplexWriterExpr) {
+  public static LogicalExpression materialize(LogicalExpression expr,
+      VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext,
+      boolean allowComplexWriterExpr) {
     return materialize(expr, batch, errorCollector, functionLookupContext, allowComplexWriterExpr, false);
   }
 
-  public static LogicalExpression materializeFilterExpr(LogicalExpression expr, TupleMetadata fieldTypes, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext) {
+  public static LogicalExpression materializeFilterExpr(LogicalExpression expr,
+      TupleMetadata fieldTypes, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext) {
     final FilterMaterializeVisitor filterMaterializeVisitor = new FilterMaterializeVisitor(fieldTypes, errorCollector);
     return expr.accept(filterMaterializeVisitor, functionLookupContext);
   }
@@ -140,19 +147,27 @@
                                               boolean unionTypeEnabled) {
     Map<VectorAccessible, BatchReference> batches = Maps.newHashMap();
     batches.put(batch, null);
-    return materialize(expr, batches, errorCollector, functionLookupContext, allowComplexWriterExpr, unionTypeEnabled);
+    return materialize(expr, batches, errorCollector, functionLookupContext,
+        allowComplexWriterExpr, unionTypeEnabled);
   }
 
   /**
-   * Materializes logical expression taking into account passed parameters.
-   * Is used to materialize logical expression that can contain several batches with or without custom batch reference.
+   * Materializes logical expression taking into account passed parameters. Is
+   * used to materialize logical expression that can contain several batches
+   * with or without custom batch reference.
    *
-   * @param expr logical expression to be materialized
-   * @param batches one or more batch instances used in expression
-   * @param errorCollector error collector
-   * @param functionLookupContext context to find drill function holder
-   * @param allowComplexWriterExpr true if complex expressions are allowed
-   * @param unionTypeEnabled true if union type is enabled
+   * @param expr
+   *          logical expression to be materialized
+   * @param batches
+   *          one or more batch instances used in expression
+   * @param errorCollector
+   *          error collector
+   * @param functionLookupContext
+   *          context to find drill function holder
+   * @param allowComplexWriterExpr
+   *          true if complex expressions are allowed
+   * @param unionTypeEnabled
+   *          true if union type is enabled
    * @return materialized logical expression
    */
   public static LogicalExpression materialize(LogicalExpression expr,
@@ -177,7 +192,8 @@
     }
   }
 
-  public static LogicalExpression convertToNullableType(LogicalExpression fromExpr, MinorType toType, FunctionLookupContext functionLookupContext, ErrorCollector errorCollector) {
+  public static LogicalExpression convertToNullableType(LogicalExpression fromExpr,
+      MinorType toType, FunctionLookupContext functionLookupContext, ErrorCollector errorCollector) {
     String funcName = "convertToNullable" + toType.toString();
     List<LogicalExpression> args = Lists.newArrayList();
     args.add(fromExpr);
@@ -193,11 +209,13 @@
     return matchedConvertToNullableFuncHolder.getExpr(funcName, args, ExpressionPosition.UNKNOWN);
   }
 
-  public static LogicalExpression addCastExpression(LogicalExpression fromExpr, MajorType toType, FunctionLookupContext functionLookupContext, ErrorCollector errorCollector) {
+  public static LogicalExpression addCastExpression(LogicalExpression fromExpr,
+      MajorType toType, FunctionLookupContext functionLookupContext, ErrorCollector errorCollector) {
     return addCastExpression(fromExpr, toType, functionLookupContext, errorCollector, true);
   }
 
-  public static LogicalExpression addCastExpression(LogicalExpression fromExpr, MajorType toType, FunctionLookupContext functionLookupContext, ErrorCollector errorCollector, boolean exactResolver) {
+  public static LogicalExpression addCastExpression(LogicalExpression fromExpr, MajorType toType,
+      FunctionLookupContext functionLookupContext, ErrorCollector errorCollector, boolean exactResolver) {
     String castFuncName = FunctionReplacementUtils.getCastFunc(toType.getMinorType());
     List<LogicalExpression> castArgs = Lists.newArrayList();
     castArgs.add(fromExpr);  //input_expr
@@ -330,9 +348,9 @@
   }
 
   private abstract static class AbstractMaterializeVisitor extends AbstractExprVisitor<LogicalExpression, FunctionLookupContext, RuntimeException> {
-    private ExpressionValidator validator = new ExpressionValidator();
+    private final ExpressionValidator validator = new ExpressionValidator();
     private ErrorCollector errorCollector;
-    private Deque<ErrorCollector> errorCollectors = new ArrayDeque<>();
+    private final Deque<ErrorCollector> errorCollectors = new ArrayDeque<>();
     private final boolean allowComplexWriter;
     /**
      * If this is false, the materializer will not handle or create UnionTypes
@@ -351,6 +369,7 @@
       return newExpr;
     }
 
+    @Override
     public abstract LogicalExpression visitSchemaPath(SchemaPath path, FunctionLookupContext functionLookupContext);
 
     @Override
@@ -359,8 +378,11 @@
     }
 
     @Override
-    public LogicalExpression visitFunctionHolderExpression(FunctionHolderExpression holder, FunctionLookupContext functionLookupContext) {
-      // a function holder is already materialized, no need to rematerialize.  generally this won't be used unless we materialize a partial tree and rematerialize the whole tree.
+    public LogicalExpression visitFunctionHolderExpression(FunctionHolderExpression holder,
+        FunctionLookupContext functionLookupContext) {
+      // a function holder is already materialized, no need to rematerialize.
+      // generally this won't be used unless we materialize a partial tree and
+      // rematerialize the whole tree.
       return holder;
     }
 
@@ -398,7 +420,9 @@
       DrillFuncHolder matchedFuncHolder = functionLookupContext.findDrillFunction(resolver, call);
 
       if (matchedFuncHolder instanceof DrillComplexWriterFuncHolder && ! allowComplexWriter) {
-        errorCollector.addGeneralError(call.getPosition(), "Only ProjectRecordBatch could have complex writer function. You are using complex writer function " + call.getName() + " in a non-project operation!");
+        errorCollector.addGeneralError(call.getPosition(),
+            "Only ProjectRecordBatch could have complex writer function. You are using complex writer function "
+                + call.getName() + " in a non-project operation!");
       }
 
       //new arg lists, possible with implicit cast inserted.
@@ -414,13 +438,15 @@
 
           // Case 1: If  1) the argument is NullExpression
           //             2) the minor type of parameter of matchedFuncHolder is not LATE (the type of null expression is still unknown)
-          //             3) the parameter of matchedFuncHolder allows null input, or func's null_handling is NULL_IF_NULL (means null and non-null are exchangeable).
+          //             3) the parameter of matchedFuncHolder allows null input, or func's null_handling
+          //                is NULL_IF_NULL (means null and non-null are exchangeable).
           //         then replace NullExpression with a TypedNullConstant
           if (currentArg.equals(NullExpression.INSTANCE) && !MinorType.LATE.equals(parmType.getMinorType()) &&
               (TypeProtos.DataMode.OPTIONAL.equals(parmType.getMode()) ||
               matchedFuncHolder.getNullHandling() == FunctionTemplate.NullHandling.NULL_IF_NULL)) {
             argsWithCast.add(new TypedNullConstant(parmType));
-          } else if (Types.softEquals(parmType, currentArg.getMajorType(), matchedFuncHolder.getNullHandling() == FunctionTemplate.NullHandling.NULL_IF_NULL) ||
+          } else if (Types.softEquals(parmType, currentArg.getMajorType(),
+              matchedFuncHolder.getNullHandling() == FunctionTemplate.NullHandling.NULL_IF_NULL) ||
                      matchedFuncHolder.isFieldReader(i)) {
             // Case 2: argument and parameter matches, or parameter is FieldReader.  Do nothing.
             argsWithCast.add(currentArg);
@@ -494,12 +520,11 @@
     }
 
     /**
-     * Converts a function call with a Union type input into a case statement, where each branch of the case corresponds to
-     * one of the subtypes of the Union type. The function call is materialized in each of the branches, with the union input cast
-     * to the specific type corresponding to the branch of the case statement
-     * @param call
-     * @param functionLookupContext
-     * @return
+     * Converts a function call with a Union type input into a case statement,
+     * where each branch of the case corresponds to one of the subtypes of the
+     * Union type. The function call is materialized in each of the branches,
+     * with the union input cast to the specific type corresponding to the
+     * branch of the case statement
      */
     private LogicalExpression rewriteUnionFunction(FunctionCall call, FunctionLookupContext functionLookupContext) {
       LogicalExpression[] args = new LogicalExpression[call.args.size()];
@@ -527,9 +552,12 @@
             newArgs.add(e.accept(new CloneVisitor(), null));
           }
 
-          // When expanding the expression tree to handle the different subtypes, we will not throw an exception if one
-          // of the branches fails to find a function match, since it is possible that code path will never occur in execution
-          // So instead of failing to materialize, we generate code to throw the exception during execution if that code
+          // When expanding the expression tree to handle the different
+          // subtypes, we will not throw an exception if one
+          // of the branches fails to find a function match, since it is
+          // possible that code path will never occur in execution
+          // So instead of failing to materialize, we generate code to throw the
+          // exception during execution if that code
           // path is hit.
 
           errorCollectors.push(errorCollector);
@@ -602,6 +630,7 @@
       return new FunctionCall(isFuncName, args, ExpressionPosition.UNKNOWN);
     }
 
+    @Override
     public LogicalExpression visitIfExpression(IfExpression ifExpr, FunctionLookupContext functionLookupContext) {
       IfExpression.IfCondition conditions = ifExpr.ifCondition;
       LogicalExpression newElseExpr = ifExpr.elseExpression.accept(this, functionLookupContext);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/annotations/FunctionTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/annotations/FunctionTemplate.java
index 62025f5..0e0a48a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/annotations/FunctionTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/annotations/FunctionTemplate.java
@@ -17,6 +17,12 @@
  */
 package org.apache.drill.exec.expr.annotations;
 
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.List;
+
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.FunctionAttributes;
@@ -30,12 +36,6 @@
 import org.apache.drill.exec.expr.fn.output.SameInOutLengthReturnTypeInference;
 import org.apache.drill.exec.expr.fn.output.StringCastReturnTypeInference;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.List;
-
 
 @Retention(RetentionPolicy.RUNTIME)
 @Target({ElementType.TYPE})
@@ -76,16 +76,20 @@
   FunctionCostCategory costCategory() default FunctionCostCategory.SIMPLE;
 
   /**
-   * <p>Set Operand type-checking strategy for an operator which takes no operands and need to be invoked
-   * without parentheses. E.g.: session_id is a niladic function.</p>
+   * <p>Set Operand type-checking strategy for an operator which takes no operands
+   * and need to be invoked without parentheses. E.g.: session_id is a niladic
+   * function.</p>
    *
-   * <p>Niladic functions override columns that have names same as any niladic function. Such columns cannot be
-   * queried without the table qualification. Value of the niladic function is returned when table
-   * qualification is not used.</p>
+   * <p>Niladic functions override columns that have names same as any niladic
+   * function. Such columns cannot be queried without the table qualification.
+   * Value of the niladic function is returned when table qualification is not
+   * used.</p>
    *
    * <p>For e.g. in the case of session_id:<br/>
-   * select session_id from table -> returns the value of niladic function session_id<br/>
-   * select t1.session_id from table t1 -> returns session_id column value from table<p>
+   * select session_id from table -> returns the value of niladic function
+   * session_id<br/>
+   * select t1.session_id from table t1 -> returns session_id column value from
+   * table</p>
    */
   boolean isNiladic() default false;
   boolean checkPrecisionRange() default false;
@@ -101,7 +105,7 @@
   boolean isVarArg() default false;
 
   /**
-   * This enum will be used to estimate the average size of the output
+   * Estimates the average size of the output
    * produced by a function that produces variable length output
    */
   enum OutputWidthCalculatorType {
@@ -194,7 +198,6 @@
     public TypeProtos.MajorType getType(List<LogicalExpression> logicalExpressions, FunctionAttributes attributes) {
       return inference.getType(logicalExpressions, attributes);
     }
-
   }
 
   enum FunctionCostCategory {
@@ -213,6 +216,5 @@
     public static FunctionCostCategory getDefault() {
       return SIMPLE;
     }
-
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 043eb1e..7f4acc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -17,18 +17,9 @@
  */
 package org.apache.drill.exec.expr.fn;
 
-import com.sun.codemodel.JAssignmentTarget;
-import com.sun.codemodel.JCodeModel;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JInvocation;
-import org.apache.drill.exec.expr.holders.RepeatedListHolder;
-import org.apache.drill.exec.expr.holders.ValueHolder;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Strings;
-import com.sun.codemodel.JBlock;
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JType;
-import com.sun.codemodel.JVar;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -50,16 +41,28 @@
 import org.apache.drill.exec.expr.fn.output.OutputWidthCalculator;
 import org.apache.drill.exec.expr.holders.ListHolder;
 import org.apache.drill.exec.expr.holders.MapHolder;
+import org.apache.drill.exec.expr.holders.RepeatedListHolder;
 import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
+import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.List;
+import com.sun.codemodel.JAssignmentTarget;
+import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JCodeModel;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JType;
+import com.sun.codemodel.JVar;
 
 public abstract class DrillFuncHolder extends AbstractFuncHolder {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolder.class);
+  static final Logger logger = LoggerFactory.getLogger(DrillFuncHolder.class);
 
   private final FunctionAttributes attributes;
   private final FunctionInitializer initializer;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
index ae6d289..54e176c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillSimpleFuncHolder.java
@@ -19,7 +19,8 @@
 
 import static org.apache.drill.shaded.guava.com.google.common.base.Preconditions.checkNotNull;
 
-import com.sun.codemodel.JOp;
+import java.util.List;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.FunctionHolderExpression;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -29,16 +30,15 @@
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.DrillSimpleFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.vector.ValueHolderHelper;
 
 import com.sun.codemodel.JBlock;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JMod;
+import com.sun.codemodel.JOp;
 import com.sun.codemodel.JVar;
-import org.apache.drill.exec.vector.ValueHolderHelper;
-
-import java.util.List;
 
 public class DrillSimpleFuncHolder extends DrillFuncHolder {
 
@@ -56,12 +56,15 @@
   private String setupBody() {
     return meth("setup", false);
   }
+
   private String evalBody() {
     return meth("eval");
   }
+
   private String resetBody() {
     return meth("reset", false);
   }
+
   private String cleanupBody() {
     return meth("cleanup", false);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java
index 1820a2f..8c3d662 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java
@@ -245,8 +245,8 @@
     }
   }
 
-  @SuppressWarnings("unused")
-  @FunctionTemplate(names = {"castUNION", "castToUnion"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
+  @FunctionTemplate(names = {"castUNION", "castToUnion"},
+      scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
   public static class CastUnionToUnion implements DrillSimpleFunc{
 
     @Param FieldReader in;
@@ -263,8 +263,8 @@
     }
   }
 
-  @SuppressWarnings("unused")
-  @FunctionTemplate(name = "ASSERT_LIST", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+  @FunctionTemplate(name = "ASSERT_LIST", scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls=NullHandling.INTERNAL)
   public static class CastUnionList implements DrillSimpleFunc {
 
     @Param UnionHolder in;
@@ -286,8 +286,8 @@
     }
   }
 
-  @SuppressWarnings("unused")
-  @FunctionTemplate(name = "IS_LIST", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+  @FunctionTemplate(name = "IS_LIST", scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls=NullHandling.INTERNAL)
   public static class UnionIsList implements DrillSimpleFunc {
 
     @Param UnionHolder in;
@@ -306,8 +306,8 @@
     }
   }
 
-  @SuppressWarnings("unused")
-  @FunctionTemplate(name = "ASSERT_MAP", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+  @FunctionTemplate(name = "ASSERT_MAP", scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls=NullHandling.INTERNAL)
   public static class CastUnionMap implements DrillSimpleFunc {
 
     @Param UnionHolder in;
@@ -329,8 +329,8 @@
     }
   }
 
-  @SuppressWarnings("unused")
-  @FunctionTemplate(names = {"IS_MAP", "IS_STRUCT"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
+  @FunctionTemplate(names = {"IS_MAP", "IS_STRUCT"},
+      scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL)
   public static class UnionIsMap implements DrillSimpleFunc {
 
     @Param UnionHolder in;
@@ -349,7 +349,8 @@
     }
   }
 
-  @FunctionTemplate(names = {"isnotnull", "is not null"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+  @FunctionTemplate(names = {"isnotnull", "is not null"},
+      scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
   public static class IsNotNull implements DrillSimpleFunc {
 
     @Param UnionHolder input;
@@ -364,7 +365,9 @@
     }
   }
 
-  @FunctionTemplate(names = {"isnull", "is null"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+  @FunctionTemplate(names = {"isnull", "is null"},
+      scope = FunctionTemplate.FunctionScope.SIMPLE,
+      nulls = FunctionTemplate.NullHandling.INTERNAL)
   public static class IsNull implements DrillSimpleFunc {
 
     @Param UnionHolder input;
@@ -378,5 +381,4 @@
       out.value = input.isSet == 1 ? 0 : 1;
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/DefaultReturnTypeInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/DefaultReturnTypeInference.java
index 4f6b1ea..8099ffd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/DefaultReturnTypeInference.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/DefaultReturnTypeInference.java
@@ -17,15 +17,15 @@
  */
 package org.apache.drill.exec.expr.fn.output;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.FunctionAttributes;
 import org.apache.drill.exec.expr.fn.FunctionUtils;
 import org.apache.drill.exec.expr.fn.ValueReference;
-
-import java.util.List;
-import java.util.Set;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 /**
  * Return type calculation implementation for functions with return type set as
@@ -45,17 +45,21 @@
   @Override
   public TypeProtos.MajorType getType(List<LogicalExpression> logicalExpressions, FunctionAttributes attributes) {
     if (attributes.getReturnValue().getType().getMinorType() == TypeProtos.MinorType.UNION) {
-      final Set<TypeProtos.MinorType> subTypes = Sets.newHashSet();
-      for (final ValueReference ref : attributes.getParameters()) {
+      Set<TypeProtos.MinorType> subTypes = Sets.newHashSet();
+      for (ValueReference ref : attributes.getParameters()) {
         subTypes.add(ref.getType().getMinorType());
       }
 
-      final TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder()
+      TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder()
           .setMinorType(TypeProtos.MinorType.UNION)
           .setMode(TypeProtos.DataMode.OPTIONAL);
 
-      for (final TypeProtos.MinorType subType : subTypes) {
-        builder.addSubType(subType);
+      for (TypeProtos.MinorType subType : subTypes) {
+        // LATE is not a valid concrete type; used only as a method
+        // annotation.
+        if (subType != TypeProtos.MinorType.LATE) {
+          builder.addSubType(subType);
+        }
       }
       return builder.build();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 0b14fdb..f905687 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -111,7 +111,7 @@
 
   private class HashAggMemoryManager extends RecordBatchMemoryManager {
     @SuppressWarnings("unused")
-    private int valuesRowWidth = 0;
+    private int valuesRowWidth;
 
     HashAggMemoryManager(int outputBatchSize) {
       super(outputBatchSize);
@@ -221,7 +221,6 @@
     return aggregator.getOutputCount();
   }
 
-  @SuppressWarnings("incomplete-switch")
   @Override
   public void buildSchema() throws SchemaChangeException {
     IterOutcome outcome = next(incoming);
@@ -236,15 +235,18 @@
       case STOP:
         state = BatchState.STOP;
         return;
+      default:
+        break;
     }
 
-    this.incomingSchema = incoming.getSchema();
+    incomingSchema = incoming.getSchema();
     if (!createAggregator()) {
       state = BatchState.DONE;
     }
     for (VectorWrapper<?> w : container) {
       AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0);
     }
+    container.setValueCount(0);
     if (incoming.getRecordCount() > 0) {
       hashAggMemoryManager.update();
     }
@@ -321,10 +323,12 @@
   }
 
   /**
-   * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
-   * and informing the context of the failure state, as well is informing the upstream operators.
+   * Creates a new Aggregator based on the current schema. If setup fails, this
+   * method is responsible for cleaning up and informing the context of the
+   * failure state, as well is informing the upstream operators.
    *
-   * @return true if the aggregator was setup successfully. false if there was a failure.
+   * @return true if the aggregator was setup successfully. false if there was a
+   *         failure.
    */
   private boolean createAggregator() {
     try {
@@ -359,9 +363,7 @@
 
     ErrorCollector collector = new ErrorCollectorImpl();
 
-    int i;
-
-    for (i = 0; i < numGroupByExprs; i++) {
+    for (int i = 0; i < numGroupByExprs; i++) {
       NamedExpression ne = popConfig.getGroupByExprs().get(i);
       final LogicalExpression expr =
           ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
@@ -378,7 +380,7 @@
     }
 
     int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column
-    for (i = 0; i < numAggrExprs; i++) {
+    for (int i = 0; i < numAggrExprs; i++) {
       NamedExpression ne = popConfig.getAggrExprs().get(i);
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 179e6c1..6d06a58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -39,12 +39,13 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(FilterRecordBatch.class);
 
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
@@ -76,51 +77,56 @@
 
   @Override
   protected IterOutcome doWork() {
-    container.zeroVectors();
-    int recordCount = incoming.getRecordCount();
     try {
+      container.zeroVectors();
+      int recordCount = incoming.getRecordCount();
       filter.filterBatch(recordCount);
+      // The container needs the actual number of values in
+      // its contained vectors (not the filtered count)
+      // Not sure the SV4 case is actually supported...
+      container.setRecordCount(
+          sv2 != null ? sv2.getBatchActualRecordCount() : recordCount);
+      return getFinalOutcome(false);
     } catch (SchemaChangeException e) {
       throw new UnsupportedOperationException(e);
     }
-
-    return getFinalOutcome(false);
   }
 
   @Override
   public void close() {
+    clearSv();
+    super.close();
+  }
+
+  private void clearSv() {
     if (sv2 != null) {
       sv2.clear();
     }
     if (sv4 != null) {
       sv4.clear();
     }
-    super.close();
   }
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    if (sv2 != null) {
-      sv2.clear();
-    }
+    clearSv();
 
     switch (incoming.getSchema().getSelectionVectorMode()) {
       case NONE:
         if (sv2 == null) {
           sv2 = new SelectionVector2(oContext.getAllocator());
         }
-        this.filter = generateSV2Filterer();
+        filter = generateSV2Filterer();
         break;
       case TWO_BYTE:
         sv2 = new SelectionVector2(oContext.getAllocator());
-        this.filter = generateSV2Filterer();
+        filter = generateSV2Filterer();
         break;
       case FOUR_BYTE:
         /*
          * Filter does not support SV4 handling. There are couple of minor issues in the
          * logic that handles SV4 + filter should always be pushed beyond sort so disabling
          * it in FilterPrel.
-         *
          */
       default:
         throw new UnsupportedOperationException();
@@ -164,15 +170,14 @@
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
-
   }
 
   protected Filterer generateSV2Filterer() throws SchemaChangeException {
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
     final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions());
-    // Uncomment below lines to enable saving generated code file for debugging
-    // cg.getCodeGenerator().plainJavaCapable(true);
+    cg.getCodeGenerator().plainJavaCapable(true);
+    // Uncomment the following line to enable saving generated code file for debugging
     // cg.getCodeGenerator().saveCodeForDebugging(true);
 
     final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index d09e23e..c189367 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -32,18 +32,21 @@
   private SelectionVector2 incomingSelectionVector;
   private SelectionVectorMode svMode;
   private TransferPair[] transfers;
+  private RecordBatch outgoing;
 
   @Override
-  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException {
+  public void setup(FragmentContext context, RecordBatch incoming,
+      RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException {
     this.transfers = transfers;
     this.outgoingSelectionVector = outgoing.getSelectionVector2();
     this.svMode = incoming.getSchema().getSelectionVectorMode();
+    this.outgoing = outgoing;
 
-    switch(svMode){
+    switch(svMode) {
     case NONE:
       break;
     case TWO_BYTE:
-      this.incomingSelectionVector = incoming.getSelectionVector2();
+      incomingSelectionVector = incoming.getSelectionVector2();
       break;
     default:
       // SV4 is handled in FilterTemplate4
@@ -52,47 +55,57 @@
     doSetup(context, incoming, outgoing);
   }
 
-  private void doTransfers(){
-    for(TransferPair t : transfers){
+  private void doTransfers() {
+    for (TransferPair t : transfers) {
       t.transfer();
     }
   }
 
   @Override
-  public void filterBatch(int recordCount) throws SchemaChangeException{
+  public void filterBatch(int recordCount) throws SchemaChangeException {
     if (recordCount == 0) {
       outgoingSelectionVector.setRecordCount(0);
+      outgoingSelectionVector.setBatchActualRecordCount(0);
+
+      // Must allocate vectors, then set count to zero. Allocation
+      // is needed since offset vectors must contain at least one
+      // item (the required value of 0 in index location 0.)
+      outgoing.getContainer().allocateNew();
+      outgoing.getContainer().setValueCount(0);
       return;
     }
     if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
       throw new OutOfMemoryException("Unable to allocate filter batch");
     }
 
-    switch(svMode){
+    switch(svMode) {
     case NONE:
       // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
       // batch if possible at once rather than row-by-row
       outgoingSelectionVector.setBatchActualRecordCount(recordCount);
       filterBatchNoSV(recordCount);
+      doTransfers();
+      outgoing.getContainer().setRecordCount(recordCount);
       break;
     case TWO_BYTE:
       // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
       // batch if possible at once rather than row-by-row
-      outgoingSelectionVector.setBatchActualRecordCount(incomingSelectionVector.getBatchActualRecordCount());
+      int actualRowCount = incomingSelectionVector.getBatchActualRecordCount();
+      outgoingSelectionVector.setBatchActualRecordCount(actualRowCount);
       filterBatchSV2(recordCount);
+      doTransfers();
+      outgoing.getContainer().setRecordCount(actualRowCount);
       break;
     default:
       throw new UnsupportedOperationException();
     }
-    doTransfers();
   }
 
   private void filterBatchSV2(int recordCount) throws SchemaChangeException {
     int svIndex = 0;
-    final int count = recordCount;
-    for(int i = 0; i < count; i++){
-      char index = incomingSelectionVector.getIndex(i);
-      if(doEval(index, 0)){
+    for (int i = 0; i < recordCount; i++) {
+      int index = incomingSelectionVector.getIndex(i);
+      if (doEval(index, 0)) {
         outgoingSelectionVector.setIndex(svIndex, index);
         svIndex++;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index ed7b265..ae04579 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -17,7 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.limit;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
+import java.util.List;
+
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -29,20 +32,20 @@
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.IntVector;
-
-import java.util.List;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Helps to perform limit in a partition within a record batch. Currently only integer type of partition column is
- * supported. This is mainly used for Lateral/Unnest subquery where each output batch from Unnest will contain an
+ * Helps to perform limit in a partition within a record batch. Currently only
+ * integer type of partition column is supported. This is mainly used for
+ * Lateral/Unnest subquery where each output batch from Unnest will contain an
  * implicit column for rowId for each row.
  */
 public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(LimitRecordBatch.class);
 
-  private SelectionVector2 outgoingSv;
+  private final SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
 
   // Start offset of the records
@@ -50,8 +53,8 @@
   private int numberOfRecords;
   private final List<TransferPair> transfers = Lists.newArrayList();
 
-  // Partition RowId which is currently being processed, this will help to handle cases when rows for a partition id
-  // flows across 2 batches
+  // Partition RowId which is currently being processed, this will help to
+  // handle cases when rows for a partition id flows across 2 batches
   private int partitionId;
   private IntVector partitionColumn;
 
@@ -84,20 +87,20 @@
     container.clear();
     transfers.clear();
 
-    for(final VectorWrapper<?> v : incoming) {
-      final TransferPair pair = v.getValueVector().makeTransferPair(
+    for(VectorWrapper<?> v : incoming) {
+      TransferPair pair = v.getValueVector().makeTransferPair(
         container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
 
-      // Hold the transfer pair target vector for partitionColumn, since before applying limit it transfer all rows
-      // from incoming to outgoing batch
+      // Hold the transfer pair target vector for partitionColumn, since before
+      // applying limit it transfer all rows from incoming to outgoing batch
       String fieldName = v.getField().getName();
       if (fieldName.equals(popConfig.getPartitionColumn())) {
         partitionColumn = (IntVector) pair.getTo();
       }
     }
 
-    final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
+    BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
 
     switch(svMode) {
       case NONE:
@@ -118,15 +121,17 @@
   }
 
   /**
-   * Gets the outcome to return from super implementation and then in case of EMIT outcome it refreshes the state of
-   * operator. Refresh is done to again apply limit on all the future incoming batches which will be part of next
+   * Gets the outcome to return from super implementation and then in case of
+   * EMIT outcome it refreshes the state of operator. Refresh is done to again
+   * apply limit on all the future incoming batches which will be part of next
    * record boundary.
+   *
    * @param hasRemainder
    * @return - IterOutcome to send downstream
    */
   @Override
   protected IterOutcome getFinalOutcome(boolean hasRemainder) {
-    final IterOutcome outcomeToReturn = super.getFinalOutcome(hasRemainder);
+    IterOutcome outcomeToReturn = super.getFinalOutcome(hasRemainder);
 
     // EMIT outcome means leaf operator is UNNEST, hence refresh the state no matter limit is reached or not.
     if (outcomeToReturn == EMIT) {
@@ -137,12 +142,15 @@
 
   @Override
   protected IterOutcome doWork() {
-    final int inputRecordCount = incoming.getRecordCount();
+    int inputRecordCount = incoming.getRecordCount();
     if (inputRecordCount == 0) {
-      setOutgoingRecordCount(0);
-      for (VectorWrapper vw : incoming) {
-        vw.clear();
-      }
+      incoming.getContainer().zeroVectors();
+      outgoingSv.setRecordCount(0);
+      outgoingSv.setBatchActualRecordCount(0);
+      // Must allocate vectors to allow for offset vectors which
+      // require a zero in the 0th position.
+      container.allocateNew();
+      container.setRecordCount(0);
       // Release buffer for sv2 (if any)
       if (incomingSv != null) {
         incomingSv.clear();
@@ -150,13 +158,20 @@
       return getFinalOutcome(false);
     }
 
-    for (final TransferPair tp : transfers) {
+    for (TransferPair tp : transfers) {
       tp.transfer();
     }
 
+    // Must report the actual value count as the record
+    // count. This is NOT the input record count when the input
+    // is an SV2.
+    int inputValueCount = incoming.getContainer().getRecordCount();
+    container.setRecordCount(inputValueCount);
+
     // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
     // batch to output record batch and later an SV2Remover copies the needed records.
     outgoingSv.allocateNew(inputRecordCount);
+    outgoingSv.setBatchActualRecordCount(inputValueCount);
     limit(inputRecordCount);
 
     // Release memory for incoming sv (if any)
@@ -167,10 +182,13 @@
   }
 
   /**
-   * limit call when incoming batch has number of records more than the start offset such that it can produce some
-   * output records. After first call of this method recordStartOffset should be 0 since we have already skipped the
+   * limit call when incoming batch has number of records more than the start
+   * offset such that it can produce some output records. After first call of
+   * this method recordStartOffset should be 0 since we have already skipped the
    * required number of records as part of first incoming record batch.
-   * @param inputRecordCount - number of records in incoming batch
+   *
+   * @param inputRecordCount
+   *          - number of records in incoming batch
    */
   private void limit(int inputRecordCount) {
     boolean outputAllRecords = (numberOfRecords == Integer.MIN_VALUE);
@@ -179,7 +197,7 @@
     // If partitionId is not -1 that means it's set to previous batch last partitionId
     partitionId = (partitionId == -1) ? getCurrentRowId(0) : partitionId;
 
-    for (int i=0; i < inputRecordCount;) {
+    for (int i = 0; i < inputRecordCount;) {
       // Get rowId from current right row
       int currentRowId = getCurrentRowId(i);
 
@@ -191,8 +209,8 @@
           continue;
         }
 
-        // Once the start offset records are skipped then consider rows until numberOfRecords is reached for that
-        // partition
+        // Once the start offset records are skipped then consider rows until
+        // numberOfRecords is reached for that partition
         if (outputAllRecords) {
           updateOutputSV2(svIndex++, i);
         } else if (numberOfRecords > 0) {
@@ -206,7 +224,7 @@
       }
     }
 
-    setOutgoingRecordCount(svIndex);
+    outgoingSv.setRecordCount(svIndex);
   }
 
   private void updateOutputSV2(int svIndex, int incomingIndex) {
@@ -225,15 +243,12 @@
     }
   }
 
-  private void setOutgoingRecordCount(int outputCount) {
-    outgoingSv.setRecordCount(outputCount);
-    container.setRecordCount(outputCount);
-  }
-
   /**
-   * Reset the states for recordStartOffset, numberOfRecords and based on the {@link PartitionLimit} passed to the
-   * operator. It also resets the partitionId since after EMIT outcome there will be new partitionId to consider.
-   * This method is called for the each EMIT outcome received no matter if limit is reached or not.
+   * Reset the states for recordStartOffset, numberOfRecords and based on the
+   * {@link PartitionLimit} passed to the operator. It also resets the
+   * partitionId since after EMIT outcome there will be new partitionId to
+   * consider. This method is called for the each EMIT outcome received no
+   * matter if limit is reached or not.
    */
   private void refreshLimitState() {
     refreshConfigParameter();
@@ -241,8 +256,10 @@
   }
 
   /**
-   * Only resets the recordStartOffset and numberOfRecord based on {@link PartitionLimit} passed to the operator. It
-   * is explicitly called after the limit for each partitionId is met or partitionId changes within an EMIT boundary.
+   * Only resets the recordStartOffset and numberOfRecord based on
+   * {@link PartitionLimit} passed to the operator. It is explicitly called
+   * after the limit for each partitionId is met or partitionId changes within
+   * an EMIT boundary.
    */
   private void refreshConfigParameter() {
     // Make sure startOffset is non-negative
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 7b6f349..8215fde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -166,7 +166,7 @@
         IterOutcome next = null;
         while (incomingRecordCount == 0) {
           if (getLastKnownOutcome() == EMIT) {
-            throw new UnsupportedOperationException("Currently functions producing complex types as output is not " +
+            throw new UnsupportedOperationException("Currently functions producing complex types as output are not " +
                     "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
                     "function in the projection list of outermost query.");
           }
@@ -227,15 +227,14 @@
     long projectEndTime = System.currentTimeMillis();
     logger.trace("doWork(): projection: records {}, time {} ms", outputRecords, (projectEndTime - projectStartTime));
 
+    setValueCount(outputRecords);
+    recordCount = outputRecords;
     if (outputRecords < incomingRecordCount) {
-      setValueCount(outputRecords);
       hasRemainder = true;
       remainderIndex = outputRecords;
-      recordCount = remainderIndex;
     } else {
-      setValueCount(incomingRecordCount);
+      assert outputRecords == incomingRecordCount;
       incoming.getContainer().zeroVectors();
-      recordCount = outputRecords;
     }
     // In case of complex writer expression, vectors would be added to batch run-time.
     // We have to re-build the schema.
@@ -306,11 +305,16 @@
   }
 
   private void setValueCount(int count) {
+    if (count == 0) {
+      container.setEmpty();
+      return;
+    }
     for (ValueVector v : allocationVectors) {
-      final ValueVector.Mutator m = v.getMutator();
-      m.setValueCount(count);
+      v.getMutator().setValueCount(count);
     }
 
+    // Value counts for vectors should have been set via
+    // the transfer pairs or vector copies.
     container.setRecordCount(count);
 
     if (complexWriters == null) {
@@ -322,7 +326,8 @@
     }
   }
 
-  /** hack to make ref and full work together... need to figure out if this is still necessary. **/
+  // hack to make ref and full work together... need to figure out if this is
+  // still necessary.
   private FieldReference getRef(NamedExpression e) {
     return e.getRef();
   }
@@ -340,7 +345,7 @@
     if (!(ex.getExpr() instanceof SchemaPath)) {
       return false;
     }
-    NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+    NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
     return expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
   }
 
@@ -730,7 +735,8 @@
           String incomingName = vvIn.getField().getName();
           // get the prefix of the name
           String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
-          // if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it
+          // if incoming value vector does not have a prefix, ignore it since
+          // this expression is not referencing it
           if (nameComponents.length <= 1) {
             k++;
             continue;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 2f1aa02..0c11893 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -17,7 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import java.util.List;
+
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -25,23 +28,19 @@
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-
-import javax.inject.Named;
-import java.util.List;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
 public abstract class ProjectorTemplate implements Projector {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectorTemplate.class);
 
   private ImmutableList<TransferPair> transfers;
   private SelectionVector2 vector2;
   private SelectionVector4 vector4;
   private SelectionVectorMode svMode;
 
-  public ProjectorTemplate() {
-  }
+  public ProjectorTemplate() { }
 
   @Override
-  public final int projectRecords(RecordBatch incomingRecordBatch, int startIndex, final int recordCount,
+  public final int projectRecords(RecordBatch incomingRecordBatch, int startIndex, int recordCount,
                                   int firstOutputIndex) {
     assert incomingRecordBatch != this; // mixed up incoming and outgoing batches?
     switch (svMode) {
@@ -49,7 +48,7 @@
       throw new UnsupportedOperationException();
 
     case TWO_BYTE:
-      final int count = recordCount;
+      int count = recordCount;
       for (int i = 0; i < count; i++, firstOutputIndex++) {
         try {
           doEval(vector2.getIndex(i), firstOutputIndex);
@@ -60,7 +59,7 @@
       return recordCount;
 
     case NONE:
-      final int countN = recordCount;
+      int countN = recordCount;
       int i;
       for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
         try {
@@ -69,7 +68,7 @@
           throw new UnsupportedOperationException(e);
         }
       }
-      final int totalBatchRecordCount = incomingRecordBatch.getRecordCount();
+      int totalBatchRecordCount = incomingRecordBatch.getRecordCount();
       if (startIndex + recordCount < totalBatchRecordCount || startIndex > 0 ) {
         for (TransferPair t : transfers) {
           t.splitAndTransfer(startIndex, i - startIndex);
@@ -77,7 +76,7 @@
         return i - startIndex;
       }
       for (TransferPair t : transfers) {
-          t.transfer();
+        t.transfer();
       }
       return recordCount;
 
@@ -87,15 +86,18 @@
   }
 
   @Override
-  public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException{
+  public final void setup(FragmentContext context, RecordBatch incoming,
+      RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException{
 
     this.svMode = incoming.getSchema().getSelectionVectorMode();
     switch (svMode) {
     case FOUR_BYTE:
-      this.vector4 = incoming.getSelectionVector4();
+      vector4 = incoming.getSelectionVector4();
       break;
     case TWO_BYTE:
-      this.vector2 = incoming.getSelectionVector2();
+      vector2 = incoming.getSelectionVector2();
+      break;
+    default:
       break;
     }
     this.transfers = ImmutableList.copyOf(transfers);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
index a463519..87afe75 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
@@ -33,7 +33,7 @@
   public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
-    final int count = outgoing.getNumberOfColumns();
+    int count = outgoing.getNumberOfColumns();
     vvOut = new ValueVector[count];
 
     for (int index = 0; index < count; index++) {
@@ -62,9 +62,9 @@
   }
 
   private int insertRecords(int outgoingPosition, int index, int recordCount) {
-    final int endIndex = index + recordCount;
+    int endIndex = index + recordCount;
 
-    for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){
+    for (int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++) {
       copyEntryIndirect(svIndex, outgoingPosition);
     }
 
@@ -73,11 +73,7 @@
   }
 
   protected void updateCounts(int numRecords) {
-    outgoing.setRecordCount(numRecords);
-
-    for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) {
-      vvOut[vectorIndex].getMutator().setValueCount(numRecords);
-    }
+    outgoing.setValueCount(numRecords);
   }
 
   public abstract void copyEntryIndirect(int inIndex, int outIndex);
@@ -85,7 +81,7 @@
   public abstract void copyEntry(int inIndex, int outIndex);
 
   public static void allocateOutgoing(VectorContainer outgoing, int recordCount) {
-    for(VectorWrapper<?> out : outgoing) {
+    for (VectorWrapper<?> out : outgoing) {
       TypeProtos.MajorType type = out.getField().getType();
 
       if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index a8c3622..d45157a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -24,7 +24,6 @@
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 
 public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
@@ -64,14 +63,17 @@
   @Override
   protected IterOutcome doWork() {
     try {
-      copier.copyRecords(0, incoming.getRecordCount());
+      int rowCount = incoming.getRecordCount();
+      if (rowCount == 0) {
+        container.setEmpty();
+      } else {
+        copier.copyRecords(0, rowCount);
+      }
     } catch (Exception e) {
       throw new IllegalStateException(e);
     } finally {
       if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) {
-        for(VectorWrapper<?> v: incoming) {
-          v.clear();
-        }
+        incoming.getContainer().zeroVectors();
         if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
           incoming.getSelectionVector2().clear();
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 8647468..8c9bb47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -17,8 +17,12 @@
  */
 package org.apache.drill.exec.physical.impl.union;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
+
 import org.apache.calcite.util.Pair;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -54,21 +58,19 @@
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Stack;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(UnionAllRecordBatch.class);
 
-  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private UnionAller unionall;
   private final List<TransferPair> transfers = Lists.newArrayList();
-  private List<ValueVector> allocationVectors = Lists.newArrayList();
-  private int recordCount = 0;
+  private final List<ValueVector> allocationVectors = Lists.newArrayList();
+  private int recordCount;
   private UnionInputIterator unionInputIterator;
 
   public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
@@ -108,7 +110,8 @@
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     VectorAccessibleUtilities.allocateVectors(container, 0);
-    VectorAccessibleUtilities.setValueCount(container,0);
+    VectorAccessibleUtilities.setValueCount(container, 0);
+    container.setRecordCount(0);
   }
 
   @Override
@@ -153,8 +156,8 @@
     return recordCount;
   }
 
-
-  private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException {
+  private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema)
+      throws ClassTransformationException, IOException, SchemaChangeException {
     Preconditions.checkArgument(batchStatus.batch.getSchema().getFieldCount() == container.getSchema().getFieldCount(),
         "Input batch and output batch have different field counthas!");
 
@@ -169,6 +172,7 @@
     batchMemoryManager.allocateVectors(allocationVectors, recordsToProcess);
     recordCount = unionall.unionRecords(batchStatus.recordsProcessed, recordsToProcess, 0);
     VectorUtil.setValueCount(allocationVectors, recordCount);
+    container.setRecordCount(recordCount);
 
     // save number of records processed so far in batch status.
     batchStatus.recordsProcessed += recordCount;
@@ -254,11 +258,9 @@
     unionall.setup(context, inputBatch, this, transfers);
   }
 
-
   // The output table's column names always follow the left table,
   // where the output type is chosen based on DRILL's implicit casting rules
   private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema rightSchema) {
-//    outputFields = Lists.newArrayList();
     final Iterator<MaterializedField> leftIter = leftSchema.iterator();
     final Iterator<MaterializedField> rightIter = rightSchema.iterator();
 
@@ -306,7 +308,8 @@
       ++index;
     }
 
-    assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
+    assert !leftIter.hasNext() && ! rightIter.hasNext() :
+      "Mismatch of column count should have been detected when validating sqlNode at planning";
   }
 
   private void inferOutputFieldsOneSide(final BatchSchema schema) {
@@ -315,11 +318,6 @@
     }
   }
 
-  private static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
-    return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
-        && (leftField.getType().getMode() == rightField.getType().getMode());
-  }
-
   private class BatchStatusWrappper {
     boolean prefetched;
     final RecordBatch batch;
@@ -344,7 +342,7 @@
   }
 
   private class UnionInputIterator implements Iterator<Pair<IterOutcome, BatchStatusWrappper>> {
-    private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
+    private final Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
 
     UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
       if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
@@ -416,7 +414,6 @@
     public void remove() {
       throw new UnsupportedOperationException();
     }
-
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java
index aedd82e..3ec45a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java
@@ -17,17 +17,17 @@
  */
 package org.apache.drill.exec.physical.impl.union;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import java.util.List;
+
+import javax.inject.Named;
+
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
-
-import javax.inject.Named;
-import java.util.List;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
 public abstract class UnionAllerTemplate implements UnionAller {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllerTemplate.class);
 
   private ImmutableList<TransferPair> transfers;
 
@@ -48,7 +48,8 @@
   }
 
   @Override
-  public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
+  public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
+      List<TransferPair> transfers) throws SchemaChangeException{
     this.transfers = ImmutableList.copyOf(transfers);
     doSetup(context, incoming, outgoing);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 14447b9..fa8c954 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -17,8 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.unnest;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+import java.util.List;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.types.TypeProtos;
@@ -44,25 +46,24 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
-
-import java.util.List;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // TODO - handle the case where a user tries to unnest a scalar, should just return the column as is
 public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(UnnestRecordBatch.class);
 
   private final String rowIdColumnName; // name of the field holding the rowId implicit column
-  private IntVector rowIdVector; // vector to keep the implicit rowId column in
+  private IntVector rowIdVector;        // vector to keep the implicit rowId column in
 
   private Unnest unnest = new UnnestImpl();
-  private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
+  private boolean hasRemainder;         // set to true if there is data left over for the current row AND if we want
                                         // to keep processing it. Kill may be called by a limit in a subquery that
-                                        // requires us to stop processing thecurrent row, but not stop processing
+                                        // requires us to stop processing the current row, but not stop processing
                                         // the data.
-  private int remainderIndex = 0;
-  private int recordCount;
+  private int remainderIndex;
   private MaterializedField unnestFieldMetadata;
   // Reference of TypedFieldId for Unnest column. It's always set in schemaChanged method and later used by others
   private TypedFieldId unnestTypedFieldId;
@@ -84,7 +85,6 @@
     }
   }
 
-
   /**
    * Memory manager for Unnest. Estimates the batch size exactly like we do for Flatten.
    */
@@ -102,19 +102,19 @@
       // Get column size of unnest column.
       RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(unnestFieldMetadata.getName());
 
-      final int rowIdColumnSize = TypeHelper.getSize(rowIdVector.getField().getType());
+      int rowIdColumnSize = TypeHelper.getSize(rowIdVector.getField().getType());
 
       // Average rowWidth of single element in the unnest list.
       // subtract the offset vector size from column data size.
-      final int avgRowWidthSingleUnnestEntry = RecordBatchSizer
+      int avgRowWidthSingleUnnestEntry = RecordBatchSizer
           .safeDivide(columnSize.getTotalNetSize() - (getOffsetVectorWidth() * columnSize.getValueCount()), columnSize
               .getElementCount());
 
       // Average rowWidth of outgoing batch.
-      final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry + rowIdColumnSize;
+      int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry + rowIdColumnSize;
 
       // Number of rows in outgoing batch
-      final int outputBatchSize = getOutputBatchSize();
+      int outputBatchSize = getOutputBatchSize();
       // Number of rows in outgoing batch
       setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
 
@@ -127,10 +127,8 @@
 
       updateIncomingStats();
     }
-
   }
 
-
   public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws OutOfMemoryException {
     super(pop, context);
     pop.addUnnestBatch(this);
@@ -143,7 +141,7 @@
 
   @Override
   public int getRecordCount() {
-    return recordCount;
+    return container.getRecordCount();
   }
 
   @Override
@@ -162,7 +160,6 @@
       "Neither the LateralOutcome is STOP nor executor state is failed");
       logger.debug("Kill received. Stopping all processing");
     state = BatchState.DONE;
-    recordCount = 0;
     hasRemainder = false; // whatever the case, we need to stop processing the current row.
   }
 
@@ -249,9 +246,9 @@
   }
 
   private void setUnnestVector() {
-    final MaterializedField field = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
-    final RepeatedValueVector vector;
-    final ValueVector inVV =
+    MaterializedField field = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
+    RepeatedValueVector vector;
+    ValueVector inVV =
         incoming.getValueAccessorById(field.getValueClass(), unnestTypedFieldId.getFieldIds()).getValueVector();
 
     if (!(inVV instanceof RepeatedValueVector)) {
@@ -273,7 +270,7 @@
   protected IterOutcome doWork() {
     Preconditions.checkNotNull(lateral);
     unnest.setOutputCount(memoryManager.getOutputRowCount());
-    final int incomingRecordCount = incoming.getRecordCount();
+    int incomingRecordCount = incoming.getRecordCount();
 
     int remainingRecordCount = unnest.getUnnestField().getAccessor().getInnerValueCount() - remainderIndex;
 
@@ -281,10 +278,10 @@
     rowIdVector.allocateNew(Math.min(remainingRecordCount, memoryManager.getOutputRowCount()));
 
     //Expected output count is the num of values in the unnest column array
-    final int childCount = incomingRecordCount == 0 ? 0 : remainingRecordCount;
+    int childCount = incomingRecordCount == 0 ? 0 : remainingRecordCount;
 
     // Unnest the data
-    final int outputRecords = childCount == 0 ? 0 : unnest.unnestRecords(childCount);
+    int outputRecords = childCount == 0 ? 0 : unnest.unnestRecords(childCount);
 
     logger.debug("{} values out of {} were processed.", outputRecords, childCount);
     // Keep track of any spill over into another batch. Happens only if you artificially set the output batch
@@ -298,8 +295,7 @@
       remainderIndex = 0;
       logger.debug("IterOutcome: EMIT.");
     }
-    this.recordCount = outputRecords;
-    this.rowIdVector.getMutator().setValueCount(outputRecords);
+    container.setValueCount(outputRecords);
 
     memoryManager.updateOutgoingStats(outputRecords);
     // If the current incoming record has spilled into two batches, we return
@@ -312,19 +308,22 @@
   }
 
   /**
-   * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for
-   * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to
-   * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the
-   * scalar vector of the same type, we can avoid making individual copies for the column being unnested, and just
-   * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the unnest
-   * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or
-   * the end of one of the other vectors while we are copying the data of the other vectors alongside each new unnested
-   * value coming out of the repeated field.)
+   * The data layout is the same for the actual data within a repeated field, as
+   * it is in a scalar vector for the same sql type. For example, a repeated int
+   * vector has a vector of offsets into a regular int vector to represent the
+   * lists. As the data layout for the actual values in the same in the repeated
+   * vector as in the scalar vector of the same type, we can avoid making
+   * individual copies for the column being unnested, and just use vector copies
+   * between the inner vector of the repeated field to the resulting scalar
+   * vector from the unnest operation. This is completed after we determine how
+   * many records will fit (as we will hit either a batch end, or the end of one
+   * of the other vectors while we are copying the data of the other vectors
+   * alongside each new unnested value coming out of the repeated field.)
    */
   private TransferPair getUnnestFieldTransferPair(FieldReference reference) {
-    final int[] typeFieldIds = unnestTypedFieldId.getFieldIds();
-    final Class<?> vectorClass = incoming.getSchema().getColumn(typeFieldIds[0]).getValueClass();
-    final ValueVector unnestField = incoming.getValueAccessorById(vectorClass, typeFieldIds).getValueVector();
+    int[] typeFieldIds = unnestTypedFieldId.getFieldIds();
+    Class<?> vectorClass = incoming.getSchema().getColumn(typeFieldIds[0]).getValueClass();
+    ValueVector unnestField = incoming.getValueAccessorById(vectorClass, typeFieldIds).getValueVector();
 
     TransferPair tp = null;
     if (unnestField instanceof RepeatedMapVector) {
@@ -337,11 +336,11 @@
       }
       logger.error("Cannot cast {} to RepeatedValueVector", unnestField);
       //when incoming recordCount is 0, don't throw exception since the type being seen here is not solid
-      final ValueVector vv = new RepeatedMapVector(unnestField.getField(), oContext.getAllocator(), null);
+      ValueVector vv = new RepeatedMapVector(unnestField.getField(), oContext.getAllocator(), null);
       tp = RepeatedValueVector.class.cast(vv)
           .getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator());
     } else {
-      final ValueVector vvIn = RepeatedValueVector.class.cast(unnestField).getDataVector();
+      ValueVector vvIn = RepeatedValueVector.class.cast(unnestField).getDataVector();
       // vvIn may be null because of fast schema return for repeated list vectors
       if (vvIn != null) {
         tp = vvIn.getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator());
@@ -351,9 +350,9 @@
   }
 
   private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
-    final List<TransferPair> transfers = Lists.newArrayList();
-    final FieldReference fieldReference = new FieldReference(popConfig.getColumn());
-    final TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
+    List<TransferPair> transfers = Lists.newArrayList();
+    FieldReference fieldReference = new FieldReference(popConfig.getColumn());
+    TransferPair transferPair = getUnnestFieldTransferPair(fieldReference);
     transfers.add(transferPair);
     logger.debug("Added transfer for unnest expression.");
     unnest.close();
@@ -366,16 +365,17 @@
   protected boolean setupNewSchema() throws SchemaChangeException {
     Preconditions.checkNotNull(lateral);
     container.clear();
-    recordCount = 0;
-    final MaterializedField rowIdField = MaterializedField.create(rowIdColumnName, Types.required(TypeProtos
+    MaterializedField rowIdField = MaterializedField.create(rowIdColumnName, Types.required(TypeProtos
         .MinorType.INT));
     this.rowIdVector= (IntVector)TypeHelper.getNewVector(rowIdField, oContext.getAllocator());
     container.add(rowIdVector);
     unnest = new UnnestImpl();
     unnest.setRowIdVector(rowIdVector);
-    final TransferPair tp = resetUnnestTransferPair();
+    TransferPair tp = resetUnnestTransferPair();
     container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
     container.buildSchema(SelectionVectorMode.NONE);
+    container.allocateNew();
+    container.setValueCount(0);
     return true;
   }
 
@@ -388,17 +388,18 @@
    */
   private boolean schemaChanged() throws SchemaChangeException {
     unnestTypedFieldId = checkAndGetUnnestFieldId();
-    final MaterializedField thisField = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
-    final MaterializedField prevField = unnestFieldMetadata;
+    MaterializedField thisField = incoming.getSchema().getColumn(unnestTypedFieldId.getFieldIds()[0]);
+    MaterializedField prevField = unnestFieldMetadata;
     Preconditions.checkNotNull(thisField);
 
-    // isEquivalent may return false if the order of the fields has changed. This usually does not
-    // happen but if it does we end up throwing a spurious schema change exeption
+    // isEquivalent may return false if the order of the fields has changed.
+    // This usually does not happen but if it does we end up throwing a spurious
+    // schema change exception
     if (prevField == null || !prevField.isEquivalent(thisField)) {
-      logger.debug("Schema changed");
-      // We should store the clone of MaterializedField for unnest column instead of reference. When the column is of
-      // type Map and there is change in any children field of the Map then that will update the reference variable and
-      // isEquivalent check will still return true.
+      // We should store the clone of MaterializedField for unnest column
+      // instead of reference. When the column is of type Map and there is
+      // change in any children field of the Map then that will update the
+      // reference variable and isEquivalent check will still return true.
       unnestFieldMetadata = thisField.clone();
       return true;
     }
@@ -430,7 +431,7 @@
   }
 
   private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException {
-    final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
+    TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
     if (fieldId == null) {
       throw new SchemaChangeException(String.format("Unnest column %s not found inside the incoming record batch. " +
           "This may happen if a wrong Unnest column name is used in the query. Please rerun query after fixing that.",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index fd90268..695e26a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -20,9 +20,15 @@
 import java.util.IdentityHashMap;
 import java.util.Map;
 
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
+import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
 import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
@@ -38,8 +44,13 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.ZeroVector;
 import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.complex.DictVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.UnionVector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,14 +152,28 @@
     }
   }
 
-  private enum CheckMode { COUNTS, ALL, NONE };
+  private enum CheckMode {
+    /** No checking. */
+    NONE,
+    /** Check only batch, container counts. */
+    COUNTS,
+    /** Check vector value counts. */
+    VECTORS,
+    /** Check special handling of zero-sized vectors.
+     *  This is the most strict form of checking, but
+     *  many operators fail this check.
+     */
+    ZERO_SIZE
+    };
 
   private static final Map<Class<? extends CloseableRecordBatch>, CheckMode> checkRules = buildRules();
 
   private final ErrorReporter errorReporter;
+  private final boolean checkZeroSize;
 
-  public BatchValidator(ErrorReporter errorReporter) {
+  public BatchValidator(ErrorReporter errorReporter, boolean checkZeroSize) {
     this.errorReporter = errorReporter;
+    this.checkZeroSize = checkZeroSize;
   }
 
   /**
@@ -159,9 +184,14 @@
    */
   private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() {
     Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>();
-    rules.put(OperatorRecordBatch.class, CheckMode.ALL);
-    rules.put(ScanBatch.class, CheckMode.ALL);
-    rules.put(ProjectRecordBatch.class, CheckMode.COUNTS);
+    rules.put(OperatorRecordBatch.class, CheckMode.VECTORS);
+    rules.put(ScanBatch.class, CheckMode.VECTORS);
+    rules.put(ProjectRecordBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(FilterRecordBatch.class, CheckMode.VECTORS);
+    rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS);
+    rules.put(UnnestRecordBatch.class, CheckMode.VECTORS);
+    rules.put(HashAggBatch.class, CheckMode.VECTORS);
+    rules.put(RemovingRecordBatch.class, CheckMode.ZERO_SIZE);
     return rules;
   }
 
@@ -242,15 +272,17 @@
         break;
       }
     }
-    if (checkMode == CheckMode.ALL) {
-      new BatchValidator(reporter).validateBatch(batch, valueCount);
+    if (checkMode == CheckMode.VECTORS ||
+        checkMode == CheckMode.ZERO_SIZE) {
+      new BatchValidator(reporter, checkMode == CheckMode.ZERO_SIZE)
+          .validateBatch(batch, valueCount);
     }
     return reporter.errorCount() == 0;
   }
 
   public static boolean validate(VectorAccessible batch) {
     ErrorReporter reporter = errorReporter(batch);
-    new BatchValidator(reporter).validateBatch(batch, batch.getRecordCount());
+    new BatchValidator(reporter, false).validateBatch(batch, batch.getRecordCount());
     return reporter.errorCount() == 0;
   }
 
@@ -271,40 +303,59 @@
 
   private void validateWrapper(int rowCount, VectorWrapper<? extends ValueVector> w) {
     if (w instanceof SimpleVectorWrapper) {
-      validateVector(rowCount, w.getValueVector());
+      validateVector(rowCount, w.getValueVector(), true);
     }
   }
 
-  private void validateVector(int expectedCount, ValueVector vector) {
+  private void validateVector(int expectedCount, ValueVector vector, boolean topLevel) {
+    validateVector(vector.getField().getName(), expectedCount, vector, topLevel);
+  }
+
+  private void validateVector(String name, int expectedCount, ValueVector vector, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
     if (valueCount != expectedCount) {
-      error(vector.getField().getName(), vector,
+      error(name, vector,
           String.format("Row count = %d, but value count = %d",
               expectedCount, valueCount));
     }
-    validateVector(vector.getField().getName(), vector);
+    validateVector(name, vector, topLevel);
   }
 
-  private void validateVector(String name, ValueVector vector) {
+  private void validateVector(String name, ValueVector vector, boolean topLevel) {
     if (vector instanceof BitVector) {
       validateBitVector(name, (BitVector) vector);
     } else if (vector instanceof RepeatedBitVector) {
-      validateRepeatedBitVector(name, (RepeatedBitVector) vector);
+      validateRepeatedBitVector(name, (RepeatedBitVector) vector, topLevel);
     } else if (vector instanceof NullableVector) {
-      validateNullableVector(name, (NullableVector) vector);
-    } else if (vector instanceof VariableWidthVector) {
-      validateVariableWidthVector(name, (VariableWidthVector) vector);
-    } else if (vector instanceof FixedWidthVector) {
-      validateFixedWidthVector(name, (FixedWidthVector) vector);
+      validateNullableVector(name, (NullableVector) vector, topLevel);
+    } else if (vector instanceof VarCharVector) {
+      validateVarCharVector(name, (VarCharVector) vector, topLevel);
+    } else if (vector instanceof VarBinaryVector) {
+      validateVarBinaryVector(name, (VarBinaryVector) vector, topLevel);
+    } else if (vector instanceof FixedWidthVector ||
+               vector instanceof ZeroVector) {
+      // Not much to do. The only item to check is the vector
+      // count itself, which was already done. There is no inner
+      // structure to check.
     } else if (vector instanceof BaseRepeatedValueVector) {
-      validateRepeatedVector(name, (BaseRepeatedValueVector) vector);
+      validateRepeatedVector(name, (BaseRepeatedValueVector) vector, topLevel);
+    } else if (vector instanceof RepeatedMapVector) {
+      validateRepeatedMapVector(name, (RepeatedMapVector) vector, topLevel);
+    } else if (vector instanceof MapVector) {
+      validateMapVector(name, (MapVector) vector);
+    } else if (vector instanceof RepeatedListVector) {
+      validateRepeatedListVector(name, (RepeatedListVector) vector, topLevel);
+    } else if (vector instanceof DictVector) {
+      validateDictVector(name, (DictVector) vector, topLevel);
+    } else if (vector instanceof UnionVector) {
+      validateUnionVector(name, (UnionVector) vector);
     } else {
       logger.debug("Don't know how to validate vector: {}  of class {}",
           name, vector.getClass().getSimpleName());
     }
   }
 
-  private void validateNullableVector(String name, NullableVector vector) {
+  private void validateNullableVector(String name, NullableVector vector, boolean topLevel) {
     int outerCount = vector.getAccessor().getValueCount();
     ValueVector valuesVector = vector.getValuesVector();
     int valueCount = valuesVector.getAccessor().getValueCount();
@@ -314,55 +365,43 @@
           outerCount, valueCount));
     }
     verifyIsSetVector(vector, (UInt1Vector) vector.getBitsVector());
-    validateVector(name + "-values", valuesVector);
+    validateVector(name + "-values", valuesVector, topLevel);
   }
 
-  private void validateVariableWidthVector(String name, VariableWidthVector vector) {
-
-    // Offsets are in the derived classes. Handle only VarChar for now.
-
-    if (vector instanceof VarCharVector) {
-      validateVarCharVector(name, (VarCharVector) vector);
-    } else if (vector instanceof VarBinaryVector) {
-      validateVarBinaryVector(name, (VarBinaryVector) vector);
-    } else {
-      logger.debug("Don't know how to validate vector: {}  of class {}",
-          name, vector.getClass().getSimpleName());
-    }
-  }
-
-  private void validateVarCharVector(String name, VarCharVector vector) {
+  private void validateVarCharVector(String name, VarCharVector vector, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
 
     // Disabled because a large number of operators
-    // set up offset vectors wrongly.
-    if (valueCount == 0) {
+    // set up offset vectors incorrectly.
+    if (valueCount == 0 && (!topLevel || !checkZeroSize)) {
       return;
     }
 
     int dataLength = vector.getBuffer().writerIndex();
-    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false, valueCount, dataLength);
+    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false,
+        valueCount, dataLength, topLevel);
   }
 
-  private void validateVarBinaryVector(String name, VarBinaryVector vector) {
+  private void validateVarBinaryVector(String name, VarBinaryVector vector, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
 
     // Disabled because a large number of operators
     // set up offset vectors wrongly.
-    if (valueCount == 0) {
+    if (valueCount == 0 && !checkZeroSize) {
       return;
     }
 
     int dataLength = vector.getBuffer().writerIndex();
-    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false, valueCount, dataLength);
+    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false,
+        valueCount, dataLength, topLevel);
   }
 
-  private void validateRepeatedVector(String name, BaseRepeatedValueVector vector) {
+  private void validateRepeatedVector(String name, BaseRepeatedValueVector vector, boolean topLevel) {
     ValueVector dataVector = vector.getDataVector();
     int dataLength = dataVector.getAccessor().getValueCount();
     int valueCount = vector.getAccessor().getValueCount();
     int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
-        true, valueCount, dataLength);
+        true, valueCount, dataLength, topLevel);
 
     if (dataLength != itemCount) {
       error(name, vector, String.format(
@@ -373,16 +412,14 @@
     // Special handling of repeated VarChar vectors
     // The nested data vectors are not quite exactly like top-level vectors.
 
-    if (dataVector instanceof VariableWidthVector) {
-      validateVariableWidthVector(name + "-data", (VariableWidthVector) dataVector);
-    }
+    validateVector(name + "-data", dataVector, false);
   }
 
-  private void validateRepeatedBitVector(String name, RepeatedBitVector vector) {
+  private void validateRepeatedBitVector(String name, RepeatedBitVector vector, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
     int maxBitCount = valueCount * 8;
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), true, valueCount, maxBitCount);
+        vector.getOffsetVector(), true, valueCount, maxBitCount, topLevel);
     BitVector dataVector = vector.getDataVector();
     if (dataVector.getAccessor().getValueCount() != elementCount) {
       error(name, vector, String.format(
@@ -392,12 +429,6 @@
     validateBitVector(name + "-data", dataVector);
   }
 
-  private void validateFixedWidthVector(String name, FixedWidthVector vector) {
-    // Not much to do. The only item to check is the vector
-    // count itself, which was already done. There is no inner
-    // structure to check.
-  }
-
   private void validateBitVector(String name, BitVector vector) {
     BitVector.Accessor accessor = vector.getAccessor();
     int valueCount = accessor.getValueCount();
@@ -410,8 +441,66 @@
     }
   }
 
+  private void validateMapVector(String name, MapVector vector) {
+    int valueCount = vector.getAccessor().getValueCount();
+    for (ValueVector child: vector) {
+      validateVector(valueCount, child, false);
+    }
+  }
+
+  private void validateRepeatedMapVector(String name,
+      RepeatedMapVector vector, boolean topLevel) {
+    int valueCount = vector.getAccessor().getValueCount();
+    int elementCount = validateOffsetVector(name + "-offsets",
+        vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+    for (ValueVector child: vector) {
+      validateVector(elementCount, child, false);
+    }
+  }
+
+  private void validateDictVector(String name, DictVector vector, boolean topLevel) {
+    int valueCount = vector.getAccessor().getValueCount();
+    int elementCount = validateOffsetVector(name + "-offsets",
+        vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+    validateVector(elementCount, vector.getKeys(), false);
+    validateVector(elementCount, vector.getValues(), false);
+  }
+
+  private void validateRepeatedListVector(String name,
+      RepeatedListVector vector, boolean topLevel) {
+    int valueCount = vector.getAccessor().getValueCount();
+    int elementCount = validateOffsetVector(name + "-offsets",
+        vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+    validateVector(elementCount, vector.getDataVector(), false);
+  }
+
+  private void validateUnionVector(String name, UnionVector vector) {
+    int valueCount = vector.getAccessor().getValueCount();
+    MapVector internalMap = vector.getTypeMap();
+    for (MinorType type : vector.getSubTypes()) {
+      if (type == MinorType.LATE) {
+        error(name, vector, String.format(
+            "Union vector includes illegal type LATE",
+            type.name()));
+        continue;
+      }
+
+      // Warning: do not call getMember(type), doing so will create an
+      // empty map vector that causes validation to fail.
+      ValueVector child = internalMap.getChild(type.name());
+      if (child == null) {
+        error(name, vector, String.format(
+            "Union vector includes type %s, but the internal map has no matching member",
+            type.name()));
+      } else {
+        validateVector(name + "-type-" + type.name(),
+            valueCount, child, false);
+      }
+    }
+  }
+
   private int validateOffsetVector(String name, UInt4Vector offsetVector,
-      boolean repeated, int valueCount, int maxOffset) {
+      boolean repeated, int valueCount, int maxOffset, boolean topLevel) {
     UInt4Vector.Accessor accessor = offsetVector.getAccessor();
     int offsetCount = accessor.getValueCount();
     // TODO: Disabled because a large number of operators
@@ -427,13 +516,19 @@
           "Outer vector has %d values, but offset vector has %d, expected %d",
           valueCount, offsetCount, valueCount + 1));
     }
-    if (valueCount == 0) {
+    if (valueCount == 0 && (!topLevel || !checkZeroSize)) {
       return 0;
     }
 
     // First value must be zero in current version.
 
-    int prevOffset = accessor.get(0);
+    int prevOffset;
+    try {
+      prevOffset = accessor.get(0);
+    } catch (IndexOutOfBoundsException e) {
+      error(name, offsetVector, "Offset (0) must be 0 but was undefined");
+      return 0;
+    }
     if (prevOffset != 0) {
       error(name, offsetVector, "Offset (0) must be 0 but was " + prevOffset);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index e6046a3..32c7119 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -17,6 +17,12 @@
  */
 package org.apache.drill.exec.physical.impl.xsort.managed;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -38,12 +44,8 @@
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * External sort batch: a sort batch which can spill to disk in
@@ -161,7 +163,7 @@
  */
 
 public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(ExternalSortBatch.class);
 
   // For backward compatibility, masquerade as the original
   // external sort. Else, some tests don't pass.
@@ -183,8 +185,6 @@
 
   private BatchSchema schema;
 
-//  private SelectionVector4 sv4;
-
   /**
    * Iterates over the final, sorted results.
    */
@@ -193,7 +193,7 @@
   private enum SortState { START, LOAD, DELIVER, DONE }
   private SortState sortState = SortState.START;
 
-  private SortConfig sortConfig;
+  private final SortConfig sortConfig;
 
   private SortImpl sortImpl;
 
@@ -201,9 +201,9 @@
 
   private boolean firstBatchOfSchema;
 
-  private VectorContainer outputWrapperContainer;
+  private final VectorContainer outputWrapperContainer;
 
-  private SelectionVector4 outputSV4;
+  private final SelectionVector4 outputSV4;
 
   // WARNING: The enum here is used within this class. But, the members of
   // this enum MUST match those in the (unmanaged) ExternalSortBatch since
@@ -386,7 +386,8 @@
       return STOP;
     }
 
-    // If we are here that means there is some data to be returned downstream. We have to prepare output container
+    // If we are here that means there is some data to be returned downstream.
+    // We have to prepare output container
     prepareOutputContainer(resultsIterator);
     return getFinalOutcome();
   }
@@ -630,14 +631,19 @@
   }
 
   /**
-   * Based on first batch for this schema or not it either clears off the output container or just zero down the vectors
-   * Then calls {@link SortResults#updateOutputContainer(VectorContainer, SelectionVector4, IterOutcome, BatchSchema)}
-   * to populate the output container of sort with results data. It is done this way for the support of EMIT outcome
-   * where SORT will return results multiple time in same minor fragment so there needs a way to preserve the
-   * ValueVector references across output batches.
-   * However it currently only supports SortResults of type EmptyResults and MergeSortWrapper. We don't expect
-   * spilling to happen in EMIT outcome scenario hence it's not supported now.
-   * @param sortResults - Final sorted result which contains the container with data
+   * Based on first batch for this schema or not it either clears off the output
+   * container or just zero down the vectors Then calls
+   * {@link SortResults#updateOutputContainer(VectorContainer, SelectionVector4, IterOutcome, BatchSchema)}
+   * to populate the output container of sort with results data. It is done this
+   * way for the support of EMIT outcome where SORT will return results multiple
+   * time in same minor fragment so there needs a way to preserve the
+   * ValueVector references across output batches. However it currently only
+   * supports SortResults of type EmptyResults and MergeSortWrapper. We don't
+   * expect spilling to happen in EMIT outcome scenario hence it's not supported
+   * now.
+   *
+   * @param sortResults
+   *          - Final sorted result which contains the container with data
    */
   private void prepareOutputContainer(SortResults sortResults) {
     if (firstBatchOfSchema) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index cb79091..a389303 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -34,9 +34,11 @@
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
+  private static final Logger logger = LoggerFactory.getLogger(AbstractRecordBatch.class);
 
   protected final VectorContainer container;
   protected final T popConfig;
@@ -55,12 +57,12 @@
     this(popConfig, context, true, context.newOperatorContext(popConfig));
   }
 
-  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
+  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema) throws OutOfMemoryException {
     this(popConfig, context, buildSchema, context.newOperatorContext(popConfig));
   }
 
-  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema,
-      final OperatorContext oContext) {
+  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema,
+      OperatorContext oContext) {
     this.context = context;
     this.popConfig = popConfig;
     this.oContext = oContext;
@@ -109,14 +111,14 @@
     return popConfig;
   }
 
-  public final IterOutcome next(final RecordBatch b) {
+  public final IterOutcome next(RecordBatch b) {
     if(!context.getExecutorState().shouldContinue()) {
       return IterOutcome.STOP;
     }
     return next(0, b);
   }
 
-  public final IterOutcome next(final int inputIndex, final RecordBatch b){
+  public final IterOutcome next(int inputIndex, RecordBatch b){
     IterOutcome next;
     stats.stopProcessing();
     try{
@@ -187,7 +189,7 @@
           break;
       }
       return lastOutcome;
-    } catch (final SchemaChangeException e) {
+    } catch (SchemaChangeException e) {
       lastOutcome = IterOutcome.STOP;
       throw new DrillRuntimeException(e);
     } catch (Exception e) {
@@ -213,7 +215,7 @@
   }
 
   @Override
-  public void kill(final boolean sendUpstream) {
+  public void kill(boolean sendUpstream) {
     killIncoming(sendUpstream);
   }
 
@@ -235,21 +237,18 @@
   }
 
   @Override
-  public TypedFieldId getValueVectorId(final SchemaPath path) {
+  public TypedFieldId getValueVectorId(SchemaPath path) {
     return container.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
     return container.getValueAccessorById(clazz, ids);
   }
 
   @Override
   public WritableBatch getWritableBatch() {
-//    logger.debug("Getting writable batch.");
-    final WritableBatch batch = WritableBatch.get(this);
-    return batch;
-
+    return WritableBatch.get(this);
   }
 
   @Override
@@ -259,7 +258,7 @@
 
   @Override
   public VectorContainer getContainer() {
-    return  container;
+    return container;
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
index dda4ef5..d9b5f4b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java
@@ -34,7 +34,6 @@
  */
 public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperator> extends
     AbstractUnaryRecordBatch<T> implements TableFunctionContract{
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
 
   protected RecordBatch incoming;
   protected LateralContract lateral;
@@ -60,4 +59,3 @@
     lateral = incoming;
   }
 }
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index ec34344..9627780 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -63,9 +63,7 @@
     IterOutcome upstream = next(incoming);
     if (state != BatchState.FIRST && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
       do {
-        for (final VectorWrapper<?> w : incoming) {
-          w.clear();
-        }
+        incoming.getContainer().zeroVectors();
       } while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0);
     }
     if (state == BatchState.FIRST) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 4ee50f5..6d4e057 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -530,4 +530,20 @@
     sb.append("]");
     return sb.toString();
   }
+
+  /**
+   * Safely set this container to an empty batch. An empty batch is not
+   * fully empty: offset vectors must contain a single 0 entry in their
+   * first position.
+   */
+  public void setEmpty() {
+    // May not be needed; retaining for safety.
+    zeroVectors();
+    // Better to only allocate minimum-size offset vectors,
+    // but no good way to do that presently.
+    allocateNew();
+    // The "fill empties" logic will set the zero
+    // in the offset vectors that need it.
+    setValueCount(0);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index d3fcc5a..da42b27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -36,15 +36,17 @@
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
 public class JSONRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(JSONRecordReader.class);
 
   public static final long DEFAULT_ROWS_PER_BATCH = BaseValueVector.INITIAL_VALUE_ALLOCATION;
 
@@ -57,7 +59,7 @@
   private final DrillFileSystem fileSystem;
   private JsonProcessor jsonReader;
   private int recordCount;
-  private long runningRecordCount = 0;
+  private long runningRecordCount;
   private final FragmentContext fragmentContext;
   private final boolean enableAllTextMode;
   private final boolean enableNanInf;
@@ -67,7 +69,7 @@
   private long parseErrorCount;
   private final boolean skipMalformedJSONRecords;
   private final boolean printSkippedMalformedJSONRecordLineNumber;
-  ReadState write = null;
+  private ReadState write;
 
   /**
    * Create a JSON Record Reader that uses a file based input stream.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index ec838d0..6965bf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.vector.complex.fn;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -32,17 +30,21 @@
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import io.netty.buffer.DrillBuf;
 
 public class JsonReader extends BaseJsonReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
-      .getLogger(JsonReader.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(JsonReader.class);
   public final static int MAX_RECORD_SIZE = 128 * 1024;
 
   private final WorkingBuffer workingBuffer;
@@ -59,7 +61,7 @@
    */
   private final List<ListWriter> emptyArrayWriters = Lists.newArrayList();
 
-  private FieldSelection selection;
+  private final FieldSelection selection;
 
   private JsonReader(Builder builder) {
     super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar, builder.skipOuterList);
@@ -74,18 +76,17 @@
   }
 
   public static class Builder {
-    private  DrillBuf managedBuf;
-    private  WorkingBuffer workingBuffer;
-    private  List<SchemaPath> columns;
-    private  MapVectorOutput mapOutput;
-    private  ListVectorOutput listOutput;
-    private  String currentFieldName = "<none>";
-    private  boolean readNumbersAsDouble;
-    private  boolean skipOuterList;
-    private  boolean allTextMode;
-    private  boolean enableNanInf;
-    private  boolean enableEscapeAnyChar;
-
+    private final DrillBuf managedBuf;
+    private final WorkingBuffer workingBuffer;
+    private List<SchemaPath> columns;
+    private final MapVectorOutput mapOutput;
+    private final ListVectorOutput listOutput;
+    private final String currentFieldName = "<none>";
+    private boolean readNumbersAsDouble;
+    private boolean skipOuterList;
+    private boolean allTextMode;
+    private boolean enableNanInf;
+    private boolean enableEscapeAnyChar;
 
     public Builder(DrillBuf managedBuf) {
       this.managedBuf = managedBuf;
@@ -225,7 +226,7 @@
    */
   private void writeData(MapWriter map, FieldSelection selection,
       boolean moveForward) throws IOException {
-    //
+
     map.start();
     try {
       outside: while (true) {
@@ -256,28 +257,32 @@
         case START_ARRAY:
           writeData(map.list(fieldName));
           break;
+
         case START_OBJECT:
           if (!writeMapDataIfTyped(map, fieldName)) {
             writeData(map.map(fieldName), childSelection, false);
           }
           break;
+
         case END_OBJECT:
           break outside;
 
-        case VALUE_FALSE: {
+        case VALUE_FALSE:
           map.bit(fieldName).writeBit(0);
           break;
-        }
-        case VALUE_TRUE: {
+
+        case VALUE_TRUE:
           map.bit(fieldName).writeBit(1);
           break;
-        }
+
         case VALUE_NULL:
           // do nothing as we don't have a type.
           break;
+
         case VALUE_NUMBER_FLOAT:
           map.float8(fieldName).writeFloat8(parser.getDoubleValue());
           break;
+
         case VALUE_NUMBER_INT:
           if (this.readNumbersAsDouble) {
             map.float8(fieldName).writeFloat8(parser.getDoubleValue());
@@ -285,6 +290,7 @@
             map.bigInt(fieldName).writeBigInt(parser.getLongValue());
           }
           break;
+
         case VALUE_STRING:
           handleString(parser, map, fieldName);
           break;
@@ -299,12 +305,11 @@
     } finally {
       map.end();
     }
-
   }
 
   private void writeDataAllText(MapWriter map, FieldSelection selection,
       boolean moveForward) throws IOException {
-    //
+
     map.start();
     outside: while (true) {
 
@@ -316,7 +321,7 @@
         t = parser.getCurrentToken();
         moveForward = true;
       }
-       if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
+      if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
         return;
       }
 
@@ -335,11 +340,13 @@
       case START_ARRAY:
         writeDataAllText(map.list(fieldName));
         break;
+
       case START_OBJECT:
         if (!writeMapDataIfTyped(map, fieldName)) {
           writeDataAllText(map.map(fieldName), childSelection, false);
         }
         break;
+
       case END_OBJECT:
         break outside;
 
@@ -351,6 +358,7 @@
       case VALUE_STRING:
         handleString(parser, map, fieldName);
         break;
+
       case VALUE_NULL:
         // do nothing as we don't have a type.
         break;
@@ -361,7 +369,6 @@
       }
     }
     map.end();
-
   }
 
   /**
@@ -427,20 +434,21 @@
             writeData(list.map(), FieldSelection.ALL_VALID, false);
           }
           break;
+
         case END_ARRAY:
           addIfNotInitialized(list);
         case END_OBJECT:
           break outside;
 
         case VALUE_EMBEDDED_OBJECT:
-        case VALUE_FALSE: {
+        case VALUE_FALSE:
           list.bit().writeBit(0);
           break;
-        }
-        case VALUE_TRUE: {
+
+        case VALUE_TRUE:
           list.bit().writeBit(1);
           break;
-        }
+
         case VALUE_NULL:
           throw UserException
               .unsupportedError()
@@ -449,9 +457,11 @@
                       + "Please set `store.json.all_text_mode` to true to read lists containing nulls. "
                       + "Be advised that this will treat JSON null values as a string containing the word 'null'.")
               .build(logger);
+
         case VALUE_NUMBER_FLOAT:
           list.float8().writeFloat8(parser.getDoubleValue());
           break;
+
         case VALUE_NUMBER_INT:
           if (this.readNumbersAsDouble) {
             list.float8().writeFloat8(parser.getDoubleValue());
@@ -459,9 +469,11 @@
             list.bigInt().writeBigInt(parser.getLongValue());
           }
           break;
+
         case VALUE_STRING:
           handleString(parser, list);
           break;
+
         default:
           throw UserException.dataReadError()
               .message("Unexpected token %s", parser.getCurrentToken())
@@ -472,7 +484,6 @@
       }
     }
     list.endList();
-
   }
 
   /**
@@ -493,11 +504,13 @@
       case START_ARRAY:
         writeDataAllText(list.list());
         break;
+
       case START_OBJECT:
         if (!writeListDataIfTyped(list)) {
           writeDataAllText(list.map(), FieldSelection.ALL_VALID, false);
         }
         break;
+
       case END_ARRAY:
         addIfNotInitialized(list);
       case END_OBJECT:
@@ -512,17 +525,16 @@
       case VALUE_STRING:
         handleString(parser, list);
         break;
+
       default:
         throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s",
             parser.getCurrentToken()).build(logger);
       }
     }
     list.endList();
-
   }
 
   public DrillBuf getWorkBuf() {
     return workingBuffer.getBuf();
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
index ed242be..5be86ce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedStreaming.java
@@ -25,7 +25,6 @@
 
 @Category({SlowTest.class})
 public class TestTpchDistributedStreaming extends BaseTestQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchDistributedStreaming.class);
 
   private void testDistributed(String fileName) throws Exception{
     String query = getFile(fileName);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
index a30bda9..3b5a133 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
@@ -151,7 +151,7 @@
 
   private static void checkForError(SingleRowSet batch, String expectedError) {
     CapturingReporter cr = new CapturingReporter();
-    new BatchValidator(cr).validateBatch(batch.vectorAccessible(), batch.rowCount());
+    new BatchValidator(cr, true).validateBatch(batch.vectorAccessible(), batch.rowCount());
     assertTrue(cr.errors.size() > 0);
     Pattern p = Pattern.compile(expectedError);
     Matcher m = p.matcher(cr.errors.get(0));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 55a32c8..18f5fae 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -35,9 +35,6 @@
 import java.util.List;
 import java.util.zip.GZIPOutputStream;
 
-import org.apache.drill.exec.util.JsonStringHashMap;
-import org.apache.drill.exec.util.Text;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillFileUtils;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -46,17 +43,21 @@
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.RepeatedBigIntVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.drill.test.BaseTestQuery;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestJsonReader extends BaseTestQuery {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(TestJsonReader.class);
 
   @BeforeClass
   public static void setupTestFiles() {
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 60b3ec2..d2bfb87 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -82,12 +82,12 @@
   }
 
   @Override
-  public FieldReader getReader(){
+  public FieldReader getReader() {
     return reader;
   }
 
   @Override
-  public int getValueCapacity(){
+  public int getValueCapacity() {
     return Math.min(bits.getValueCapacity(), values.getValueCapacity());
   }
 
@@ -118,7 +118,7 @@
   }
 
   @Override
-  public int getBufferSize(){
+  public int getBufferSize() {
     return values.getBufferSize() + bits.getBufferSize();
   }
 
@@ -133,7 +133,7 @@
   }
 
   @Override
-  public int getAllocatedSize(){
+  public int getAllocatedSize() {
     return bits.getAllocatedSize() + values.getAllocatedSize();
   }
 
@@ -170,7 +170,7 @@
 
   @Override
   public void allocateNew() {
-    if(!allocateNewSafe()){
+    if (!allocateNewSafe()) {
       throw new OutOfMemoryException("Failure while allocating buffer.");
     }
   }
@@ -216,7 +216,7 @@
 
   <#if type.major != "VarLen">
   @Override
-  public int getValueWidth(){
+  public int getValueWidth() {
     return bits.getValueWidth() + ${type.width};
   }
   </#if>
@@ -245,12 +245,12 @@
   }
 
   @Override
-  public int getByteCapacity(){
+  public int getByteCapacity() {
     return values.getByteCapacity();
   }
 
   @Override
-  public int getCurrentSizeInBytes(){
+  public int getCurrentSizeInBytes() {
     return values.getCurrentSizeInBytes();
   }
 
@@ -301,12 +301,12 @@
   }
 
   @Override
-  public TransferPair getTransferPair(BufferAllocator allocator){
+  public TransferPair getTransferPair(BufferAllocator allocator) {
     return new TransferImpl(getField(), allocator);
   }
 
   @Override
-  public TransferPair getTransferPair(String ref, BufferAllocator allocator){
+  public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
     return new TransferImpl(getField().withPath(ref), allocator);
   }
 
@@ -315,9 +315,10 @@
     return new TransferImpl((Nullable${minor.class}Vector) to);
   }
 
-  public void transferTo(Nullable${minor.class}Vector target){
+  public void transferTo(Nullable${minor.class}Vector target) {
     bits.transferTo(target.bits);
     values.transferTo(target.values);
+    target.mutator.setCount = mutator.setCount;
     <#if type.major == "VarLen">
     target.mutator.lastSet = mutator.lastSet;
     </#if>
@@ -335,21 +336,21 @@
   private class TransferImpl implements TransferPair {
     private final Nullable${minor.class}Vector to;
 
-    public TransferImpl(MaterializedField field, BufferAllocator allocator){
+    public TransferImpl(MaterializedField field, BufferAllocator allocator) {
       to = new Nullable${minor.class}Vector(field, allocator);
     }
 
-    public TransferImpl(Nullable${minor.class}Vector to){
+    public TransferImpl(Nullable${minor.class}Vector to) {
       this.to = to;
     }
 
     @Override
-    public Nullable${minor.class}Vector getTo(){
+    public Nullable${minor.class}Vector getTo() {
       return to;
     }
 
     @Override
-    public void transfer(){
+    public void transfer() {
       transferTo(to);
     }
 
@@ -374,7 +375,7 @@
     return mutator;
   }
 
-  public ${minor.class}Vector convertToRequiredVector(){
+  public ${minor.class}Vector convertToRequiredVector() {
     ${minor.class}Vector v = new ${minor.class}Vector(getField().getOtherNullableVersion(), allocator);
     if (v.data != null) {
       v.data.release(1);
@@ -392,14 +393,14 @@
     return bits.getValueCapacity();
   }
 
-  public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
+  public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from) {
     final Accessor fromAccessor = from.getAccessor();
     if (!fromAccessor.isNull(fromIndex)) {
       mutator.set(thisIndex, fromAccessor.get(fromIndex));
     }
   }
 
-  public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
+  public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from) {
     <#if type.major == "VarLen">
     mutator.fillEmpties(thisIndex);
     </#if>
@@ -407,7 +408,7 @@
     bits.getMutator().setSafe(thisIndex, 1);
   }
 
-  public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
+  public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from) {
     <#if type.major == "VarLen">
     mutator.fillEmpties(thisIndex);
     </#if>
@@ -485,12 +486,12 @@
       return isSet(index) == 0;
     }
 
-    public int isSet(int index){
+    public int isSet(int index) {
       return bAccessor.get(index);
     }
 
     <#if type.major == "VarLen">
-    public long getStartEnd(int index){
+    public long getStartEnd(int index) {
       return vAccessor.getStartEnd(index);
     }
 
@@ -500,7 +501,7 @@
     }
 
     </#if>
-    public void get(int index, Nullable${minor.class}Holder holder){
+    public void get(int index, Nullable${minor.class}Holder holder) {
       vAccessor.get(index, holder);
       holder.isSet = bAccessor.get(index);
 
@@ -543,12 +544,12 @@
 
     private Mutator() { }
 
-    public ${valuesName} getVectorWithValues(){
+    public ${valuesName} getVectorWithValues() {
       return values;
     }
 
     @Override
-    public void setIndexDefined(int index){
+    public void setIndexDefined(int index) {
       bits.getMutator().set(index, 1);
     }
 
@@ -587,7 +588,7 @@
     private void fillEmpties(int index) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       valuesMutator.fillEmpties(lastSet, index+1);
-      while(index > bits.getValueCapacity()) {
+      while (index > bits.getValueCapacity()) {
         bits.reAlloc();
       }
       lastSet = index;
@@ -834,7 +835,7 @@
 
     </#if>
     @Override
-    public void generateTestData(int valueCount){
+    public void generateTestData(int valueCount) {
       bits.getMutator().generateTestDataAlt(valueCount);
       values.getMutator().generateTestData(valueCount);
       <#if type.major = "VarLen">lastSet = valueCount;</#if>
@@ -842,7 +843,7 @@
     }
 
     @Override
-    public void reset(){
+    public void reset() {
       setCount = 0;
       <#if type.major = "VarLen">lastSet = -1;</#if>
     }
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index fe28277..ef83f4b 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -62,9 +62,9 @@
   public static final int NULL_MARKER = 0;
   public static final String TYPE_VECTOR_NAME = "types";
   public static final String INTERNAL_MAP_NAME = "internal";
-  
+
   private static final MajorType MAJOR_TYPES[] = new MajorType[MinorType.values().length];
-  
+
   static {
     MAJOR_TYPES[MinorType.MAP.ordinal()] = Types.optional(MinorType.MAP);
     MAJOR_TYPES[MinorType.LIST.ordinal()] = Types.optional(MinorType.LIST);
@@ -94,16 +94,16 @@
    * the union vector, but must then implement the required vector serialization/
    * deserialization and other functionality.
    */
-  
+
   private MapVector internalMap;
-  
+
   /**
-   * Cached type vector. The vector's permament location is in the
+   * Cached type vector. The vector's permanent location is in the
    * internal map, it is cached for performance. Call
    * {@link #getTypeVector()} to get the cached copy, or to refresh
    * the cache from the internal map if not set.
    */
-  
+
   private UInt1Vector typeVector;
 
   /**
@@ -114,7 +114,7 @@
    * array is not. It will be repopulated upon first access to
    * the deserialized vectors.
    */
-  
+
   private ValueVector cachedSubtypes[] = new ValueVector[MinorType.values().length];
 
   private FieldReader reader;
@@ -122,13 +122,13 @@
   private final CallBack callBack;
 
   public UnionVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
-    
+
     // The metadata may start off listing subtypes for which vectors
     // do not actually exist. It appears that the semantics are to list
     // the subtypes that *could* appear. For example, in a sort we may
     // have two types: one batch has type A, the other type B, but the
     // batches must list both A and B as subtypes.
-    
+
     this.field = field.clone();
     this.allocator = allocator;
     this.internalMap = new MapVector(INTERNAL_MAP_NAME, allocator, callBack);
@@ -141,26 +141,25 @@
   public BufferAllocator getAllocator() {
     return allocator;
   }
-  
+
   public List<MinorType> getSubTypes() {
     return field.getType().getSubTypeList();
   }
-  
+
   @SuppressWarnings("unchecked")
   public <T extends ValueVector> T subtype(MinorType type) {
     return (T) cachedSubtypes[type.ordinal()];
   }
 
-  
   /**
    * Add an externally-created subtype vector. The vector must represent a type that
    * does not yet exist in the union, and must be of OPTIONAL mode. Does not call
    * the callback since the client (presumably) knows that it is adding the type.
    * The caller must also allocate the buffer for the vector.
-   * 
+   *
    * @param vector subtype vector to add
    */
-  
+
   public void addType(ValueVector vector) {
     MinorType type = vector.getField().getType().getMinorType();
     assert subtype(type) == null;
@@ -170,7 +169,7 @@
     internalMap.putChild(type.name(), vector);
     addSubType(type);
   }
-  
+
   public void addSubType(MinorType type) {
     if (field.getType().getSubTypeList().contains(type)) {
       return;
@@ -186,12 +185,12 @@
    * "Classic" way to add a subtype when working directly with a union vector.
    * Creates the vector, adds it to the internal structures and creates a
    * new buffer of the default size.
-   * 
+   *
    * @param type the type to add
    * @param vectorClass class of the vector to create
    * @return typed form of the new value vector
    */
-  
+
   private <T extends ValueVector> T classicAddType(MinorType type, Class<? extends ValueVector> vectorClass) {
     int vectorCount = internalMap.size();
     @SuppressWarnings("unchecked")
@@ -252,14 +251,14 @@
       </#if>
     </#list>
   </#list>
-  
+
   /**
    * Add or get a type member given the type.
-   * 
+   *
    * @param type the type of the vector to retrieve
    * @return the (potentially newly created) vector that backs the given type
    */
-  
+
   public ValueVector getMember(MinorType type) {
     switch (type) {
     case MAP:
@@ -285,7 +284,7 @@
       throw new UnsupportedOperationException(type.toString());
     }
   }
-  
+
   @SuppressWarnings("unchecked")
   public <T extends ValueVector> T member(MinorType type) {
     return (T) getMember(type);
@@ -301,7 +300,7 @@
     }
     return typeVector;
   }
-  
+
   @VisibleForTesting
   public MapVector getTypeMap() {
     return internalMap;
@@ -396,11 +395,11 @@
   /**
    * Add a vector that matches the argument. Transfer the buffer from the argument
    * to the new vector.
-   * 
+   *
    * @param v the vector to clone and add
    * @return the cloned vector that now holds the data from the argument
    */
-  
+
   public ValueVector addVector(ValueVector v) {
     String name = v.getField().getType().getMinorType().name().toLowerCase();
     MajorType type = v.getField().getType();
@@ -413,14 +412,14 @@
     addSubType(minorType);
     return newVector;
   }
-  
+
   // Called from SchemaUtil
-  
+
   public ValueVector setFirstType(ValueVector v, int newValueCount) {
-    
+
     // We can't check that this really is the first subtype since
     // the subtypes can be declared before vectors are added.
-    
+
     Preconditions.checkState(accessor.getValueCount() == 0);
     final ValueVector vv = addVector(v);
     MinorType type = v.getField().getType().getMinorType();
@@ -567,7 +566,7 @@
       case MinorType.DICT_VALUE:
         return getDict().getAccessor().getObject(index);
       default:
-        throw new UnsupportedOperationException("Cannot support type: " + MinorType.valueOf(type));
+        throw new UnsupportedOperationException("Cannot support type: " + MinorType.forNumber(type));
       }
     }
 
@@ -586,11 +585,11 @@
 
     @Override
     public boolean isNull(int index) {
-      
+
       // Note that type code == 0 is used to indicate a null.
       // This corresponds to the LATE type, not the NULL type.
       // This is presumably an artifact of an earlier implementation...
-      
+
       return getTypeVector().getAccessor().get(index) == NULL_MARKER;
     }
 
@@ -606,6 +605,25 @@
     @Override
     public void setValueCount(int valueCount) {
       UnionVector.this.valueCount = valueCount;
+
+      // Get each claimed child type. This will force creation
+      // of the vector in the internal map so that we can properly
+      // set the size of that vector.
+      //
+      // TODO: This is a waste, but the semantics of this class
+      // are murky: it is not fully supported. Without this, if we
+      // ask for the child type vector later, it will have zero values
+      // even if this Union has a non-zero count.
+      //
+      // A better long-term solution would be to not add types that
+      // are not needed, or to remove those types here. In either case,
+      // the internal structure will be consistent with the claimed
+      // metadata (every type in metadata will have a matching vector in
+      // the internal map.)
+
+      for (MinorType type : getSubTypes()) {
+        getMember(type);
+      }
       internalMap.getMutator().setValueCount(valueCount);
     }
 
@@ -654,7 +672,7 @@
     public void setType(int index, MinorType type) {
       getTypeVector().getMutator().setSafe(index, type.getNumber());
     }
-    
+
     public void setNull(int index) {
       getTypeVector().getMutator().setSafe(index, NULL_MARKER);
     }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 4be33f2..58a4e58 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -191,16 +191,19 @@
   }
 
   /**
-   * Equals method doesn't check for the children list of fields here. When a batch is sent over network then it is
-   * serialized along with the Materialized Field which also contains information about the internal vectors like
-   * offset and bits. While deserializing, these vectors are treated as children of parent vector. If a operator on
-   * receiver side like Sort receives a schema in buildSchema phase and then later on receives another batch, that
-   * will result in schema change and query will fail. This is because second batch schema will contain information
-   * about internal vectors like offset and bits which will not be present in first batch schema. For ref: See
-   * TestSort#testSortWithRepeatedMapWithExchanges
+   * Equals method doesn't check for the children list of fields here. When a
+   * batch is sent over network then it is serialized along with the
+   * Materialized Field which also contains information about the internal
+   * vectors like offset and bits. While deserializing, these vectors are
+   * treated as children of parent vector. If a operator on receiver side like
+   * Sort receives a schema in buildSchema phase and then later on receives
+   * another batch, that will result in schema change and query will fail. This
+   * is because second batch schema will contain information about internal
+   * vectors like offset and bits which will not be present in first batch
+   * schema. For ref: See TestSort#testSortWithRepeatedMapWithExchanges
    *
-   * @param obj
-   * @return
+   * @param obj the other materialized field
+   * @return true if the types are equal
    */
   @Override
   public boolean equals(Object obj) {
@@ -219,51 +222,59 @@
   }
 
   /**
-   * Determine if one column is logically equivalent to another. This is
-   * a tricky issue. The rules here:
+   * Determine if one column is logically equivalent to another. This is a
+   * tricky issue. The rules here:
    * <ul>
    * <li>The other schema is assumed to be non-null (unlike
    * <tt>equals()</tt>).</li>
-   * <li>Names must be identical, ignoring case. (Drill, like SQL, is
-   * case insensitive.)
+   * <li>Names must be identical, ignoring case. (Drill, like SQL, is case
+   * insensitive.)
    * <li>Type, mode, precision and scale must be identical.</li>
-   * <li>Child columns are ignored unless the type is a map. That is, the
-   * hidden "$bits" and "$offsets" vector columns are not compared, as
-   * one schema may be an "original" (without these hidden columns) while
-   * the other may come from a vector (which has the hidden columns added.
-   * The standard <tt>equals()</tt> comparison does consider hidden
-   * columns.</li>
+   * <li>Child columns are ignored unless the type is a map. That is, the hidden
+   * "$bits" and "$offsets" vector columns are not compared, as one schema may
+   * be an "original" (without these hidden columns) while the other may come
+   * from a vector (which has the hidden columns added. The standard
+   * <tt>equals()</tt> comparison does consider hidden columns.</li>
    * <li>For maps, the child columns are compared recursively. This version
-   * requires that the two sets of columns appear in the same order. (It
-   * assumes it is being used in a context where column indexes make
-   * sense.) Operators that want to reconcile two maps that differ only in
-   * column order need a different comparison.</li>
+   * requires that the two sets of columns appear in the same order. (It assumes
+   * it is being used in a context where column indexes make sense.) Operators
+   * that want to reconcile two maps that differ only in column order need a
+   * different comparison.</li>
    * </ul>
    * <ul>
-   * Note: Materialized Field and ValueVector has 1:1 mapping which means for each ValueVector there is a materialized
-   * field associated with it. So when we replace or add a ValueVector in a VectorContainer then we create new
-   * Materialized Field object for the new vector. This works fine for Primitive type ValueVectors but for ValueVector
-   * which are of type {@link org.apache.drill.exec.vector.complex.AbstractContainerVector} there is some differences on
-   * how Materialized field and ValueVector objects are updated inside the container which both ValueVector and
-   * Materialized Field object both mutable.
+   * Note: Materialized Field and ValueVector has 1:1 mapping which means for
+   * each ValueVector there is a materialized field associated with it. So when
+   * we replace or add a ValueVector in a VectorContainer then we create new
+   * Materialized Field object for the new vector. This works fine for Primitive
+   * type ValueVectors but for ValueVector which are of type
+   * {@link org.apache.drill.exec.vector.complex.AbstractContainerVector} there
+   * is some differences on how Materialized field and ValueVector objects are
+   * updated inside the container which both ValueVector and Materialized Field
+   * object both mutable.
    * <p>
-   * For example: For cases of MapVector it can so happen that only the children field type changed but
-   * the parent Map type and name remained same. In these cases we replace the children field ValueVector from parent
-   * MapVector inside main batch container, with new type of vector. Thus the reference of parent MaprVector inside
-   * batch container remains same but the reference of children field ValueVector stored inside MapVector get's updated.
-   * During this update it also replaces the Materialized field for that children field which is stored in childrens
-   * list of the parent MapVector Materialized Field.
-   * Since the children list of parent Materialized Field is updated, this make this class mutable. Hence there should
-   * not be any check for object reference equality here but instead there should be deep comparison which is what
-   * this method is now performing. Since if we have object reference check then in above cases it will return true for
-   * 2 Materialized Field object whose children field list is different which is not correct. Same holds true for
+   * For example: For cases of MapVector it can so happen that only the children
+   * field type changed but the parent Map type and name remained same. In these
+   * cases we replace the children field ValueVector from parent MapVector
+   * inside main batch container, with new type of vector. Thus the reference of
+   * parent MaprVector inside batch container remains same but the reference of
+   * children field ValueVector stored inside MapVector get's updated. During
+   * this update it also replaces the Materialized field for that children field
+   * which is stored in childrens list of the parent MapVector Materialized
+   * Field. Since the children list of parent Materialized Field is updated,
+   * this make this class mutable. Hence there should not be any check for
+   * object reference equality here but instead there should be deep comparison
+   * which is what this method is now performing. Since if we have object
+   * reference check then in above cases it will return true for 2 Materialized
+   * Field object whose children field list is different which is not correct.
+   * Same holds true for
    * {@link MaterializedField#isEquivalent(MaterializedField)} method.
    * </p>
    * </ul>
    *
-   * @param other another field
-   * @return <tt>true</tt> if the columns are identical according to the
-   * above rules, <tt>false</tt> if they differ
+   * @param other
+   *          another field
+   * @return <tt>true</tt> if the columns are identical according to the above
+   *         rules, <tt>false</tt> if they differ
    */
 
   public boolean isEquivalent(MaterializedField other) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 4d20ee9..e791757 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.vector.complex;
 
-import io.netty.buffer.DrillBuf;
-
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -43,11 +41,12 @@
 import org.apache.drill.exec.vector.complex.RepeatedMapVector.MapSingleCopier;
 import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Ordering;
 import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
 
+import io.netty.buffer.DrillBuf;
+
 public class MapVector extends AbstractMapVector {
 
   public final static MajorType TYPE = Types.required(MinorType.MAP);
@@ -99,7 +98,7 @@
 
   @Override
   public void setInitialCapacity(int numRecords) {
-    for (final ValueVector v : this) {
+    for (ValueVector v : this) {
       v.setInitialCapacity(numRecords);
     }
   }
@@ -110,7 +109,7 @@
       return 0;
     }
     long buffer = 0;
-    for (final ValueVector v : this) {
+    for (ValueVector v : this) {
       buffer += v.getBufferSize();
     }
 
@@ -120,20 +119,20 @@
   @Override
   public int getAllocatedSize() {
     int size = 0;
-    for (final ValueVector v : this) {
+    for (ValueVector v : this) {
       size += v.getAllocatedSize();
     }
     return size;
   }
 
   @Override
-  public int getBufferSizeFor(final int valueCount) {
+  public int getBufferSizeFor(int valueCount) {
     if (valueCount == 0) {
       return 0;
     }
 
     long bufferSize = 0;
-    for (final ValueVector v : this) {
+    for (ValueVector v : this) {
       bufferSize += v.getBufferSizeFor(valueCount);
     }
 
@@ -197,7 +196,7 @@
         // (This is similar to what happens in ScanBatch where the children cannot be added till they are
         // read). To take care of this, we ensure that the hashCode of the MaterializedField does not
         // include the hashCode of the children but is based only on MaterializedField$key.
-        final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
         if (allocate && to.size() != preSize) {
           newVector.allocateNew();
         }
@@ -207,7 +206,7 @@
 
     @Override
     public void transfer() {
-      for (final TransferPair p : pairs) {
+      for (TransferPair p : pairs) {
         p.transfer();
       }
       to.valueCount = from.valueCount;
@@ -241,7 +240,7 @@
       return 0;
     }
 
-    final Ordering<ValueVector> natural = new Ordering<ValueVector>() {
+    Ordering<ValueVector> natural = new Ordering<ValueVector>() {
       @Override
       public int compare(@Nullable ValueVector left, @Nullable ValueVector right) {
         return Ints.compare(
@@ -261,12 +260,12 @@
 
   @Override
   public void load(SerializedField metadata, DrillBuf buf) {
-    final List<SerializedField> fields = metadata.getChildList();
+    List<SerializedField> fields = metadata.getChildList();
     valueCount = metadata.getValueCount();
 
     int bufOffset = 0;
-    for (final SerializedField child : fields) {
-      final MaterializedField fieldDef = MaterializedField.create(child);
+    for (SerializedField child : fields) {
+      MaterializedField fieldDef = MaterializedField.create(child);
 
       ValueVector vector = getChild(fieldDef.getName());
       if (vector == null) {
@@ -362,7 +361,7 @@
 
     @Override
     public void setValueCount(int valueCount) {
-      for (final ValueVector v : getChildren()) {
+      for (ValueVector v : getChildren()) {
         v.getMutator().setValueCount(valueCount);
       }
       setMapValueCount(valueCount);
@@ -377,7 +376,7 @@
 
   @Override
   public void clear() {
-    for (final ValueVector v : getChildren()) {
+    for (ValueVector v : getChildren()) {
       v.clear();
     }
     valueCount = 0;
@@ -385,8 +384,8 @@
 
   @Override
   public void close() {
-    final Collection<ValueVector> vectors = getChildren();
-    for (final ValueVector v : vectors) {
+    Collection<ValueVector> vectors = getChildren();
+    for (ValueVector v : vectors) {
       v.close();
     }
     vectors.clear();
diff --git a/logical/src/main/java/org/apache/drill/common/expression/IfExpression.java b/logical/src/main/java/org/apache/drill/common/expression/IfExpression.java
index d02285b..7b04c74 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/IfExpression.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/IfExpression.java
@@ -21,20 +21,16 @@
 import java.util.List;
 import java.util.Set;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 public class IfExpression extends LogicalExpressionBase {
-  static final Logger logger = LoggerFactory.getLogger(IfExpression.class);
 
   public final IfCondition ifCondition;
   public final LogicalExpression elseExpression;
@@ -57,7 +53,6 @@
       this.condition = condition;
       this.expression = expression;
     }
-
   }
 
   @Override
@@ -96,7 +91,6 @@
       Preconditions.checkNotNull(conditions);
       return new IfExpression(pos, conditions, elseExpression, outputType);
     }
-
   }
 
   @Override
@@ -155,5 +149,4 @@
 
     return cost / i;
   }
-
 }