DRILL-7456: Batch count fixes for 12 operators

Enables batch validation for 12 additional operators:

* MergingRecordBatch
* OrderedPartitionRecordBatch
* RangePartitionRecordBatch
* TraceRecordBatch
* UnionAllRecordBatch
* UnorderedReceiverBatch
* UnpivotMapsRecordBatch
* WindowFrameRecordBatch
* TopNBatch
* HashJoinBatch
* ExternalSortBatch
* WriterRecordBatch

Fixes issues found with those checks so that this set of
operators passes all checks.

Includes code cleanup in many files touched during this
work.

closes #1906
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 79b40ee..3e658cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -58,6 +58,8 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.DrillBuf;
 
@@ -65,7 +67,7 @@
  * Record batch used for a particular scan. Operators against one or more
  */
 public class ScanBatch implements CloseableRecordBatch {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(ScanBatch.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScanBatch.class);
 
   /** Main collection of fields' value vectors. */
@@ -79,7 +81,7 @@
   private BatchSchema schema;
   private final Mutator mutator;
   private boolean done;
-  private Iterator<Map<String, String>> implicitColumns;
+  private final Iterator<Map<String, String>> implicitColumns;
   private Map<String, String> implicitValues;
   private final BufferAllocator allocator;
   private final List<Map<String, String>> implicitColumnList;
@@ -179,18 +181,19 @@
   }
 
   /**
-   * This method is to perform scan specific actions when the scan needs to clean/reset readers and return NONE status
+   * This method is to perform scan specific actions when the scan needs to
+   * clean/reset readers and return NONE status
+   *
    * @return NONE
    */
   private IterOutcome cleanAndReturnNone() {
     if (isRepeatableScan) {
       readers = readerList.iterator();
-      return IterOutcome.NONE;
     } else {
       releaseAssets(); // All data has been read. Release resource.
       done = true;
-      return IterOutcome.NONE;
     }
+    return IterOutcome.NONE;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 0257abc..52fe0a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -86,8 +86,8 @@
   private final int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
 
   public static final MajorType HVtype = MajorType.newBuilder()
-    .setMinorType(MinorType.INT /* dataType */ )
-    .setMode(DataMode.REQUIRED /* mode */ )
+    .setMinorType(MinorType.INT /* dataType */)
+    .setMode(DataMode.REQUIRED /* mode */)
     .build();
 
   // The vector containers storing all the inner rows
@@ -160,7 +160,7 @@
     }
     this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
     tmpBatchesList = new ArrayList<>();
-    if ( numPartitions > 1 ) {
+    if (numPartitions > 1) {
       allocateNewCurrentBatchAndHV();
     }
   }
@@ -194,7 +194,7 @@
       while (vci.hasNext()) {
         VectorWrapper<?> vw = vci.next();
         // If processing a spilled container, skip the last column (HV)
-        if ( cycleNum > 0 && ! vci.hasNext() ) { break; }
+        if (cycleNum > 0 && ! vci.hasNext()) { break; }
         ValueVector vv = vw.getValueVector();
         ValueVector newVV = TypeHelper.getNewVector(vv.getField(), allocator);
         newVC.add(newVV); // add first to allow dealloc in case of an OOM
@@ -213,7 +213,7 @@
       newVC.setRecordCount(0);
       success = true;
     } finally {
-      if ( !success ) {
+      if (!success) {
         newVC.clear(); // in case of an OOM
       }
     }
@@ -233,11 +233,12 @@
   /**
    *  Spills if needed
    */
-  public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
+  public void appendInnerRow(VectorContainer buildContainer, int ind,
+      int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
 
-    int pos = currentBatch.appendRow(buildContainer,ind);
+    int pos = currentBatch.appendRow(buildContainer, ind);
     currHVVector.getMutator().set(pos - 1, hashCode);   // store the hash value in the new column
-    if ( pos == recordsPerBatch ) {
+    if (pos == recordsPerBatch) {
       boolean needsSpill = isSpilled || calc.shouldSpill();
       completeAnInnerBatch(true, needsSpill);
     }
@@ -245,12 +246,11 @@
 
   /**
    *  Outer always spills when batch is full
-   *
    */
   public void appendOuterRow(int hashCode, int recordsProcessed) {
     int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
     currHVVector.getMutator().set(pos - 1, hashCode);   // store the hash value in the new column
-    if ( pos == recordsPerBatch ) {
+    if (pos == recordsPerBatch) {
       completeAnOuterBatch(true);
     }
   }
@@ -269,7 +269,7 @@
    * (that is, more rows are coming) - initialize with a new current batch for that partition
    * */
   private void completeABatch(boolean toInitialize, boolean needsSpill) {
-    if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
+    if (currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
       currentBatch.add(currHVVector);
       currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
       tmpBatchesList.add(currentBatch);
@@ -283,10 +283,10 @@
     } else {
       freeCurrentBatchAndHVVector();
     }
-    if ( needsSpill ) { // spill this batch/partition and free its memory
+    if (needsSpill) { // spill this batch/partition and free its memory
       spillThisPartition();
     }
-    if ( toInitialize ) { // allocate a new batch and HV vector
+    if (toInitialize) { // allocate a new batch and HV vector
       allocateNewCurrentBatchAndHV();
     } else {
       currentBatch = null;
@@ -331,11 +331,11 @@
   }
 
   public void spillThisPartition() {
-    if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill
+    if (tmpBatchesList.size() == 0) { return; } // in case empty - nothing to spill
     logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
 
     // If this is the first spill for this partition, create an output stream
-    if ( writer == null ) {
+    if (writer == null) {
       final String side = processingOuter ? "outer" : "inner";
       final String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
       spillFile = spillSet.getNextSpillFile(suffix);
@@ -488,10 +488,10 @@
    */
   private void closeWriterInternal(boolean doDeleteFile) {
     try {
-      if ( writer != null ) {
+      if (writer != null) {
         spillSet.close(writer);
       }
-      if ( doDeleteFile && spillFile != null ) {
+      if (doDeleteFile && spillFile != null) {
         spillSet.delete(spillFile);
       }
     } catch (IOException ioe) {
@@ -512,7 +512,7 @@
    * have been consumed.
    */
   public void buildContainersHashTableAndHelper() throws SchemaChangeException {
-    if ( isSpilled ) { return; } // no building for spilled partitions
+    if (isSpilled) { return; } // no building for spilled partitions
     containers = new ArrayList<>();
     hashTable.updateInitialCapacity((int) getNumInMemoryRecords());
     for (int curr = 0; curr < partitionBatchesCount; curr++) {
@@ -520,7 +520,7 @@
       final int currentRecordCount = nextBatch.getRecordCount();
 
       // For every incoming build batch, we create a matching helper batch
-      if ( ! semiJoin ) { hjHelper.addNewBatch(currentRecordCount); }
+      if (! semiJoin) { hjHelper.addNewBatch(currentRecordCount); }
 
       // Holder contains the global index where the key is hashed into using the hash table
       final IndexPointer htIndex = new IndexPointer();
@@ -528,7 +528,7 @@
       assert nextBatch != null;
       assert probeBatch != null;
 
-      hashTable.updateIncoming(nextBatch, probeBatch );
+      hashTable.updateIncoming(nextBatch, probeBatch);
 
       IntVector HV_vector = (IntVector) nextBatch.getLast();
 
@@ -543,7 +543,7 @@
          * the current record index and batch index. This will be used
          * later when we probe and find a match.
          */
-        if ( ! semiJoin ) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
+        if (! semiJoin) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
       }
 
       containers.add(nextBatch);
@@ -570,11 +570,11 @@
   }
 
   private void freeCurrentBatchAndHVVector() {
-    if ( currentBatch != null ) {
+    if (currentBatch != null) {
       currentBatch.clear();
       currentBatch = null;
     }
-    if ( currHVVector != null ) {
+    if (currHVVector != null) {
       currHVVector.clear();
       currHVVector = null;
     }
@@ -591,13 +591,13 @@
         vc.clear();
       }
     }
-    while ( tmpBatchesList.size() > 0 ) {
+    while (tmpBatchesList.size() > 0) {
       VectorContainer vc = tmpBatchesList.remove(0);
       vc.clear();
     }
     closeWriterInternal(deleteFile);
     clearHashTableAndHelper();
-    if ( containers != null ) {
+    if (containers != null) {
       containers.clear();
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index dc063f9..eab38ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -25,18 +29,14 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
@@ -51,7 +51,6 @@
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
 import org.apache.drill.exec.memory.BaseAllocator;
@@ -65,10 +64,10 @@
 import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.Comparator;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import org.apache.drill.exec.physical.impl.common.HashPartition;
 import org.apache.drill.exec.physical.impl.common.SpilledState;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.planner.common.JoinControl;
@@ -85,49 +84,61 @@
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.exec.work.filter.BloomFilter;
 import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- *   This class implements the runtime execution for the Hash-Join operator
- *   supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
- *
- *   This implementation splits the incoming Build side rows into multiple Partitions, thus allowing spilling of
- *   some of these partitions to disk if memory gets tight. Each partition is implemented as a {@link HashPartition}.
- *   After the build phase is over, in the most general case, some of the partitions were spilled, and the others
- *   are in memory. Each of the partitions in memory would get a {@link HashTable} built.
- *      Next the Probe side is read, and each row is key matched with a Build partition. If that partition is in
- *   memory, then the key is used to probe and perform the join, and the results are added to the outgoing batch.
- *   But if that build side partition was spilled, then the matching Probe size partition is spilled as well.
- *      After all the Probe side was processed, we are left with pairs of spilled partitions. Then each pair is
- *   processed individually (that Build partition should be smaller than the original, hence likely fit whole into
- *   memory to allow probing; if not -- see below).
- *      Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
- *   the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is
- *   read and divided into new partitions, which in turn may spill again (and again...).
- *   The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
- *   indicating that the number of partitions chosen was too small.
+ * This class implements the runtime execution for the Hash-Join operator
+ * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
+ * <p>
+ * This implementation splits the incoming Build side rows into multiple
+ * Partitions, thus allowing spilling of some of these partitions to disk if
+ * memory gets tight. Each partition is implemented as a {@link HashPartition}.
+ * After the build phase is over, in the most general case, some of the
+ * partitions were spilled, and the others are in memory. Each of the partitions
+ * in memory would get a {@link HashTable} built.
+ * <p>
+ * Next the Probe side is read, and each row is key matched with a Build
+ * partition. If that partition is in memory, then the key is used to probe and
+ * perform the join, and the results are added to the outgoing batch. But if
+ * that build side partition was spilled, then the matching Probe size partition
+ * is spilled as well.
+ * <p>
+ * After all the Probe side was processed, we are left with pairs of spilled
+ * partitions. Then each pair is processed individually (that Build partition
+ * should be smaller than the original, hence likely fit whole into memory to
+ * allow probing; if not -- see below).
+ * <p>
+ * Processing of each spilled pair is EXACTLY like processing the original
+ * Build/Probe incomings. (As a fact, the {@link #innerNext()} method calls
+ * itself recursively !!). Thus the spilled build partition is read and divided
+ * into new partitions, which in turn may spill again (and again...). The code
+ * tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or
+ * greater) is a waste, indicating that the number of partitions chosen was too
+ * small.
  */
+
 public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implements RowKeyJoin {
-  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(HashJoinBatch.class);
 
   /**
    * The maximum number of records within each internal batch.
    */
-  private int RECORDS_PER_BATCH; // internal batches
+  private final int RECORDS_PER_BATCH; // internal batches
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
-  private boolean semiJoin;
-  private boolean joinIsLeftOrFull;
-  private boolean joinIsRightOrFull;
+  private final boolean semiJoin;
+  private final boolean joinIsLeftOrFull;
+  private final boolean joinIsRightOrFull;
   private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
 
   // Join conditions
@@ -141,13 +152,15 @@
   private final List<NamedExpression> rightExpr;
 
   /**
-   * Names of the join columns. This names are used in order to help estimate the size of the {@link HashTable}s.
+   * Names of the join columns. This names are used in order to help estimate
+   * the size of the {@link HashTable}s.
    */
   private final Set<String> buildJoinColumns;
 
   // Fields used for partitioning
   /**
-   * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
+   * The number of {@link HashPartition}s. This is configured via a system
+   * option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
 
@@ -155,8 +168,8 @@
    * The master class used to generate {@link HashTable}s.
    */
   private ChainedHashTable baseHashTable;
-  private MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
-  private MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
+  private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
+  private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
   private boolean canSpill = true;
   private boolean wasKilled; // a kill was received, may need to clean spilled partns
 
@@ -174,18 +187,18 @@
   private BatchSchema probeSchema;
 
   // Whether this HashJoin is used for a row-key based join
-  private boolean isRowKeyJoin = false;
+  private final boolean isRowKeyJoin;
 
-  private JoinControl joinControl;
+  private final JoinControl joinControl;
 
   // An iterator over the build side hash table (only applicable for row-key joins)
-  private boolean buildComplete = false;
+  private boolean buildComplete;
 
   // indicates if we have previously returned an output batch
   private boolean firstOutputBatch = true;
 
   private int rightHVColPosition;
-  private BufferAllocator allocator;
+  private final BufferAllocator allocator;
   // Local fields for left/right incoming - may be replaced when reading from spilled
   private RecordBatch buildBatch;
   private RecordBatch probeBatch;
@@ -193,27 +206,27 @@
   /**
    * Flag indicating whether or not the first data holding build batch needs to be fetched.
    */
-  private MutableBoolean prefetchedBuild = new MutableBoolean(false);
+  private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
   /**
    * Flag indicating whether or not the first data holding probe batch needs to be fetched.
    */
-  private MutableBoolean prefetchedProbe = new MutableBoolean(false);
+  private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
 
   // For handling spilling
-  private SpillSet spillSet;
+  private final SpillSet spillSet;
   HashJoinPOP popConfig;
 
-  private int originalPartition = -1; // the partition a secondary reads from
+  private final int originalPartition = -1; // the partition a secondary reads from
   IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
-  private int maxBatchesInMemory;
-  private List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
+  private final int maxBatchesInMemory;
+  private final List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
   private boolean enableRuntimeFilter;
   private RuntimeFilterReporter runtimeFilterReporter;
   private ValueVectorHashHelper.Hash64 hash64;
-  private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
-  private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
-  private List<BloomFilter> bloomFilters = new ArrayList<>();
-  private boolean bloomFiltersGenerated = false;
+  private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+  private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+  private final List<BloomFilter> bloomFilters = new ArrayList<>();
+  private boolean bloomFiltersGenerated;
 
   /**
    * This holds information about the spilled partitions for the build and probe side.
@@ -223,13 +236,13 @@
     private final String innerSpillFile;
     private int outerSpilledBatches;
     private String outerSpillFile;
-    private boolean updatedOuter = false;
+    private boolean updatedOuter;
 
-    public HashJoinSpilledPartition(final int cycle,
-                                    final int originPartition,
-                                    final int prevOriginPartition,
-                                    final int innerSpilledBatches,
-                                    final String innerSpillFile) {
+    public HashJoinSpilledPartition(int cycle,
+                                    int originPartition,
+                                    int prevOriginPartition,
+                                    int innerSpilledBatches,
+                                    String innerSpillFile) {
       super(cycle, originPartition, prevOriginPartition);
 
       this.innerSpilledBatches = innerSpilledBatches;
@@ -254,7 +267,7 @@
       return outerSpillFile;
     }
 
-    public void updateOuter(final int outerSpilledBatches, final String outerSpillFile) {
+    public void updateOuter(int outerSpilledBatches, String outerSpillFile) {
       Preconditions.checkState(!updatedOuter);
       updatedOuter = true;
 
@@ -294,8 +307,8 @@
   /**
    * Queue of spilled partitions to process.
    */
-  private SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
-  private HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
+  private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
+  private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
   private HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
 
   public enum Metric implements MetricDef {
@@ -349,9 +362,11 @@
 
       if (rightUpstream == OK_NEW_SCHEMA) {
         buildSchema = right.getSchema();
-        // position of the new "column" for keeping the hash values (after the real columns)
+        // position of the new "column" for keeping the hash values
+        // (after the real columns)
         rightHVColPosition = right.getContainer().getNumberOfColumns();
-        // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table
+        // In special cases, when the probe side is empty, and
+        // inner/left join - no need for Hash Table
         skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
         // We only need the hash tables if we have data on the build side.
         setupHashTable();
@@ -364,10 +379,12 @@
       }
     }
 
-    // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
+    // If we have a valid schema, this will build a valid container.
+    // If we were unable to obtain a valid schema,
     // we still need to build a dummy schema. This code handles both cases for us.
     setupOutputContainerSchema();
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.setEmpty();
   }
 
   /**
@@ -381,7 +398,9 @@
       buildBatch,
       () -> {
         batchMemoryManager.update(RIGHT_INDEX, 0, true);
-        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+            batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX),
+            getRecordBatchStatsContext());
       });
   }
 
@@ -396,7 +415,9 @@
       probeBatch,
       () -> {
         batchMemoryManager.update(LEFT_INDEX, 0);
-        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+            batchMemoryManager.getRecordBatchSizer(LEFT_INDEX),
+            getRecordBatchStatsContext());
       });
   }
 
@@ -411,11 +432,11 @@
    * @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
    */
   private IterOutcome prefetchFirstBatch(IterOutcome outcome,
-                                         final MutableBoolean prefetched,
-                                         final MutableBoolean isEmpty,
-                                         final int index,
-                                         final RecordBatch batch,
-                                         final Runnable memoryManagerUpdate) {
+                                         MutableBoolean prefetched,
+                                         MutableBoolean isEmpty,
+                                         int index,
+                                         RecordBatch batch,
+                                         Runnable memoryManagerUpdate) {
     if (prefetched.booleanValue()) {
       // We have already prefetch the first data holding batch
       return outcome;
@@ -429,7 +450,7 @@
       outcome = sniffNonEmptyBatch(outcome, index, batch);
     }
 
-    isEmpty.setValue(outcome == IterOutcome.NONE); // If we recieved NONE there is no data.
+    isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there is no data.
 
     if (outcome == IterOutcome.OUT_OF_MEMORY) {
       // We reached a termination state
@@ -440,7 +461,7 @@
     } else {
       // Got our first batch(es)
       if (spilledState.isFirstCycle()) {
-        // Only collect stats for the first cylce
+        // Only collect stats for the first cycle
         memoryManagerUpdate.run();
       }
       state = BatchState.FIRST;
@@ -488,10 +509,10 @@
    */
   public HashJoinMemoryCalculator getCalculatorImpl() {
     if (maxBatchesInMemory == 0) {
-      final double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
-      final double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
-      final double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
-      final String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+      double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
+      double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
+      double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
+      String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
 
       return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
     } else {
@@ -502,7 +523,7 @@
   @Override
   public IterOutcome innerNext() {
     if (wasKilled) {
-      // We have recieved a kill signal. We need to stop processing.
+      // We have received a kill signal. We need to stop processing.
       this.cleanup();
       super.close();
       return IterOutcome.NONE;
@@ -522,7 +543,7 @@
        */
       if (state == BatchState.FIRST) {
         // Build the hash table, using the build side record batches.
-        final IterOutcome buildExecuteTermination = executeBuildPhase();
+        IterOutcome buildExecuteTermination = executeBuildPhase();
 
         if (buildExecuteTermination != null) {
           // A termination condition was reached while executing the build phase.
@@ -591,10 +612,7 @@
 
           outputRecords = hashJoinProbe.probeAndProject();
 
-          for (final VectorWrapper<?> v : container) {
-            v.getValueVector().getMutator().setValueCount(outputRecords);
-          }
-          container.setRecordCount(outputRecords);
+          container.setValueCount(outputRecords);
 
           batchMemoryManager.updateOutgoingStats(outputRecords);
           RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
@@ -685,7 +703,7 @@
   private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
       batch.kill(true);
       while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
-        for (final VectorWrapper<?> wrapper : batch) {
+        for (VectorWrapper<?> wrapper : batch) {
           wrapper.getValueVector().clear();
         }
         upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
@@ -695,7 +713,7 @@
   private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
 
   private void setupHashTable() throws SchemaChangeException {
-    final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
+    List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
     conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
     if ( skipHashTableBuild ) { return; }
@@ -713,13 +731,13 @@
       leftExpr = null;
     } else {
       if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-        final String errorMsg = new StringBuilder().append("Hash join does not support probe batch with selection vectors. ").append("Probe batch has selection mode = ").append
+        String errorMsg = new StringBuilder().append("Hash join does not support probe batch with selection vectors. ").append("Probe batch has selection mode = ").append
           (probeBatch.getSchema().getSelectionVectorMode()).toString();
         throw new SchemaChangeException(errorMsg);
       }
     }
 
-    final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+    HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
       true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
 
     // Create the chained hash table
@@ -735,7 +753,7 @@
     ErrorCollector collector = new ErrorCollectorImpl();
     int i = 0;
     for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
+      LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       }
@@ -868,7 +886,7 @@
 
       // TODO dirty hack to prevent regressions. Remove this once batch sizing is implemented.
       // We don't have enough memory to do partitioning, we have to do everything in memory
-      final String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
+      String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
           "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
         numPartitions,
         FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
@@ -876,7 +894,7 @@
       logger.warn(message);
 
       // create a Noop memory calculator
-      final HashJoinMemoryCalculator calc = getCalculatorImpl();
+      HashJoinMemoryCalculator calc = getCalculatorImpl();
       calc.initialize(false);
       buildCalc = calc.next();
 
@@ -908,7 +926,7 @@
   private void disableSpilling(String reason) {
     // Fail, or just issue a warning if a reason was given, or a fallback option is enabled
     if ( reason == null ) {
-      final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+      boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
       if (fallbackEnabled) {
         logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" +
           " to use unbounded memory");
@@ -986,7 +1004,7 @@
     initializeRuntimeFilter();
 
     // Make the calculator aware of our partitions
-    final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
+    HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
     buildCalc.setPartitionStatSet(partitionStatSet);
 
     boolean moreData = true;
@@ -1007,7 +1025,7 @@
         // Fall through
       case OK:
         batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
-        final int currentRecordCount = buildBatch.getRecordCount();
+        int currentRecordCount = buildBatch.getRecordCount();
         //create runtime filter
         if (spilledState.isFirstCycle() && enableRuntimeFilter) {
           //create runtime filter and send out async
@@ -1040,7 +1058,7 @@
 
           }
           // Append the new inner row to the appropriate partition; spill (that partition) if needed
-          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
+          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc);
         }
 
         if ( read_right_HV_vector != null ) {
@@ -1048,6 +1066,8 @@
           read_right_HV_vector = null;
         }
         break;
+      default:
+        throw new IllegalStateException(rightUpstream.name());
       }
       // Get the next incoming record batch
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
@@ -1080,12 +1100,10 @@
     HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
     postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
 
-    //
     //  Traverse all the in-memory partitions' incoming batches, and build their hash tables
-    //
 
     for (int index = 0; index < partitions.length; index++) {
-      final HashPartition partn = partitions[index];
+      HashPartition partn = partitions[index];
 
       if (partn.isSpilled()) {
         // Don't build hash tables for spilled partitions
@@ -1101,7 +1119,7 @@
           partn.buildContainersHashTableAndHelper();
         }
       } catch (OutOfMemoryException e) {
-        final String message = "Failed building hash table on partition " + index + ":\n"
+        String message = "Failed building hash table on partition " + index + ":\n"
           + makeDebugString() + "\n"
           + postBuildCalc.makeDebugString();
         // Include debug info
@@ -1135,9 +1153,9 @@
   private void setupOutputContainerSchema() {
 
     if (buildSchema != null && ! semiJoin ) {
-      for (final MaterializedField field : buildSchema) {
-        final MajorType inputType = field.getType();
-        final MajorType outputType;
+      for (MaterializedField field : buildSchema) {
+        MajorType inputType = field.getType();
+        MajorType outputType;
         // If left or full outer join, then the output type must be nullable. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2197).
         if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
@@ -1148,16 +1166,16 @@
         }
 
         // make sure to project field with children for children to show up in the schema
-        final MaterializedField projected = field.withType(outputType);
+        MaterializedField projected = field.withType(outputType);
         // Add the vector to our output container
         container.addOrGet(projected);
       }
     }
 
     if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows)
-      for (final VectorWrapper<?> vv : probeBatch) {
-        final MajorType inputType = vv.getField().getType();
-        final MajorType outputType;
+      for (VectorWrapper<?> vv : probeBatch) {
+        MajorType inputType = vv.getField().getType();
+        MajorType outputType;
 
         // If right or full outer join then the output type should be optional. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
@@ -1168,7 +1186,7 @@
           outputType = inputType;
         }
 
-        final ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType));
+        ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType));
         if (v instanceof AbstractContainerVector) {
           vv.getValueVector().makeTransferPair(v);
           v.clear();
@@ -1213,15 +1231,15 @@
     buildJoinColumns = Sets.newHashSet();
     List<SchemaPath> rightConditionPaths = new ArrayList<>();
     for (int i = 0; i < conditions.size(); i++) {
-      final SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+      SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
       rightConditionPaths.add(rightPath);
     }
 
     for (int i = 0; i < conditions.size(); i++) {
-      final SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
-      final PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
+      SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+      PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
       buildJoinColumns.add(nameSegment.getPath());
-      final String refName = "build_side_" + i;
+      String refName = "build_side_" + i;
       rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
     }
 
@@ -1234,7 +1252,7 @@
 
     numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
 
-    final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
+    long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
 
     if (memLimit != 0) {
       allocator.setLimit(memLimit);
@@ -1250,8 +1268,8 @@
     partitions = new HashPartition[0];
 
     // get the output batch size from config.
-    final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    double avail_mem_factor = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
     int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
 
     RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
@@ -1305,11 +1323,11 @@
    * @return A memory dump string.
    */
   public String makeDebugString() {
-    final StringBuilder sb = new StringBuilder();
+    StringBuilder sb = new StringBuilder();
 
     for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
-      final String partitionPrefix = "Partition " + partitionIndex + ": ";
-      final HashPartition hashPartition = partitions[partitionIndex];
+      String partitionPrefix = "Partition " + partitionIndex + ": ";
+      HashPartition hashPartition = partitions[partitionIndex];
       sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n");
     }
 
@@ -1326,7 +1344,7 @@
     if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty
     if (!spilledState.isFirstCycle()) { return; } // These stats are only for before processing spilled files
 
-    final HashTableStats htStats = new HashTableStats();
+    HashTableStats htStats = new HashTableStats();
     long numSpilled = 0;
     HashTableStats newStats = new HashTableStats();
     // sum the stats from all the partitions
@@ -1446,18 +1464,15 @@
       batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
       batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
 
-    this.cleanup();
+    cleanup();
     super.close();
   }
 
   public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
-    final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
-    cg.plainJavaCapable(true);
-    // cg.saveCodeForDebugging(true);
 
     //  No real code generation !!
 
-    return context.getImplementationClass(cg);
+    return new HashJoinProbeTemplate();
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index e862c0e..d40f6a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -35,7 +35,7 @@
 
 import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
 
-public abstract class HashJoinProbeTemplate implements HashJoinProbe {
+public class HashJoinProbeTemplate implements HashJoinProbe {
 
   VectorContainer container; // the outgoing container
 
@@ -50,13 +50,13 @@
   // joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL
   private JoinControl joinControl;
 
-  private HashJoinBatch outgoingJoinBatch = null;
+  private HashJoinBatch outgoingJoinBatch;
 
   // Number of records to process on the probe side
-  private int recordsToProcess = 0;
+  private int recordsToProcess;
 
   // Number of records processed on the probe side
-  private int recordsProcessed = 0;
+  private int recordsProcessed;
 
   // Number of records in the output container
   private int outputRecords;
@@ -71,7 +71,7 @@
   private ProbeState probeState = ProbeState.PROBE_PROJECT;
 
   // For outer or right joins, this is a list of unmatched records that needs to be projected
-  private IntArrayList unmatchedBuildIndexes = null;
+  private IntArrayList unmatchedBuildIndexes;
 
   private  HashPartition partitions[];
 
@@ -79,14 +79,14 @@
   // probing later on the same chain of duplicates
   private HashPartition currPartition;
 
-  private int currRightPartition = 0; // for returning RIGHT/FULL
+  private int currRightPartition; // for returning RIGHT/FULL
   IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
-  private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
+  private int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
   private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
   private boolean buildSideIsEmpty = true;
   private int numPartitions = 1; // must be 2 to the power of bitsInMask
-  private int partitionMask = 0; // numPartitions - 1
-  private int bitsInMask = 0; // number of bits in the MASK
+  private int partitionMask; // numPartitions - 1
+  private int bitsInMask; // number of bits in the MASK
   private int numberOfBuildSideColumns;
   private int targetOutputRecords;
   private boolean semiJoin;
@@ -96,6 +96,7 @@
     this.targetOutputRecords = targetOutputRecords;
   }
 
+  @Override
   public int getOutputCount() {
     return outputRecords;
   }
@@ -143,7 +144,7 @@
     this.recordsProcessed = 0;
 
     // A special case - if the left was an empty file
-    if ( leftStartState == IterOutcome.NONE ){
+    if (leftStartState == IterOutcome.NONE){
       changeToFinalProbeState();
     } else {
       this.recordsToProcess = probeBatch.getRecordCount();
@@ -151,16 +152,16 @@
 
     // for those outer partitions that need spilling (cause their matching inners spilled)
     // initialize those partitions' current batches and hash-value vectors
-    for ( HashPartition partn : this.partitions) {
+    for (HashPartition partn : this.partitions) {
       partn.allocateNewCurrentBatchAndHV();
     }
 
     currRightPartition = 0; // In case it's a Right/Full outer join
 
     // Initialize the HV vector for the first (already read) left batch
-    if ( this.cycleNum > 0 ) {
-      if ( read_left_HV_vector != null ) { read_left_HV_vector.clear();}
-      if ( leftStartState != IterOutcome.NONE ) { // Skip when outer spill was empty
+    if (this.cycleNum > 0) {
+      if (read_left_HV_vector != null) { read_left_HV_vector.clear();}
+      if (leftStartState != IterOutcome.NONE) { // Skip when outer spill was empty
         read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast();
       }
     }
@@ -178,6 +179,7 @@
       destVector.copyEntry(container.getRecordCount(), srcVector, buildSrcIndex);
     }
   }
+
   /**
    * Append the given probe side row into the outgoing container, following the build side part
    * @param probeSrcContainer The container for the left/outer side
@@ -190,6 +192,7 @@
       destVector.copyEntry(container.getRecordCount(), srcVector, probeSrcIndex);
     }
   }
+
   /**
    *  A special version of the VectorContainer's appendRow for the HashJoin; (following a probe) it
    *  copies the build and probe sides into the outgoing container. (It uses a composite
@@ -204,12 +207,14 @@
   private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
                         VectorContainer probeSrcContainer, int probeSrcIndex) {
 
-    if ( buildSrcContainers != null ) {
+    if (buildSrcContainers != null) {
       int buildBatchIndex = compositeBuildSrcIndex >>> 16;
       int buildOffset = compositeBuildSrcIndex & 65535;
       appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset);
     }
-    if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex); }
+    if (probeSrcContainer != null) {
+      appendProbe(probeSrcContainer, probeSrcIndex);
+    }
     return container.incRecordCount();
   }
 
@@ -221,7 +226,7 @@
     while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) {
       outputRecords =
         outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
-          null /* no probeBatch */, 0 /* no probe index */ );
+          null /* no probeBatch */, 0 /* no probe index */);
       recordsProcessed++;
     }
   }
@@ -248,8 +253,8 @@
             recordsToProcess = 0;
             changeToFinalProbeState();
             // in case some outer partitions were spilled, need to spill their last batches
-            for ( HashPartition partn : partitions ) {
-              if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
+            for (HashPartition partn : partitions) {
+              if (! partn.isSpilled()) { continue; } // skip non-spilled
               partn.completeAnOuterBatch(false);
               // update the partition's spill record with the outer side
               HashJoinBatch.HashJoinSpilledPartition sp = spilledInners[partn.getPartitionNum()];
@@ -262,7 +267,7 @@
 
           case OK_NEW_SCHEMA:
             if (probeBatch.getSchema().equals(probeSchema)) {
-              for ( HashPartition partn : partitions ) { partn.updateBatches(); }
+              for (HashPartition partn : partitions) { partn.updateBatches(); }
 
             } else {
               throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
@@ -278,7 +283,7 @@
             if (recordsToProcess == 0) {
               continue;
             }
-            if ( cycleNum > 0 ) {
+            if (cycleNum > 0) {
               read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
             }
         }
@@ -287,8 +292,8 @@
       int probeIndex = -1;
       // Check if we need to drain the next row in the probe side
       if (getNextRecord) {
-        if ( !buildSideIsEmpty ) {
-          int hashCode = ( cycleNum == 0 ) ?
+        if (!buildSideIsEmpty) {
+          int hashCode = (cycleNum == 0) ?
             partitions[0].getProbeHashCode(recordsProcessed)
             : read_left_HV_vector.getAccessor().get(recordsProcessed);
           int currBuildPart = hashCode & partitionMask;
@@ -299,7 +304,7 @@
           currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
 
           // If the matching inner partition was spilled
-          if ( outgoingJoinBatch.isSpilledInner(currBuildPart) ) {
+          if (outgoingJoinBatch.isSpilledInner(currBuildPart)) {
             // add this row to its outer partition (may cause a spill, when the batch is full)
 
             currPartition.appendOuterRow(hashCode, recordsProcessed);
@@ -312,8 +317,8 @@
 
         }
 
-        if ( semiJoin ) {
-          if ( probeIndex != -1 ) {
+        if (semiJoin) {
+          if (probeIndex != -1) {
             // output the probe side only
             outputRecords =
               outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
@@ -392,7 +397,6 @@
         }
       }
     }
-
   }
 
   /**
@@ -408,7 +412,7 @@
     outputRecords = 0;
 
     // When handling spilled partitions, the state becomes DONE at the end of each partition
-    if ( probeState == ProbeState.DONE ) {
+    if (probeState == ProbeState.DONE) {
       return outputRecords; // that is zero
     }
 
@@ -422,7 +426,7 @@
       do {
 
         if (unmatchedBuildIndexes == null) { // first time for this partition ?
-          if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right
+          if (buildSideIsEmpty) { return outputRecords; } // in case of an empty right
           // Get this partition's list of build indexes that didn't match any record on the probe side
           unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
           recordsProcessed = 0;
@@ -432,14 +436,14 @@
         // Project the list of unmatched records on the build side
         executeProjectRightPhase(currRightPartition);
 
-        if ( recordsProcessed < recordsToProcess ) { // more records in this partition?
+        if (recordsProcessed < recordsToProcess) { // more records in this partition?
           return outputRecords;  // outgoing is full; report and come back later
         } else {
           currRightPartition++; // on to the next right partition
           unmatchedBuildIndexes = null;
         }
 
-      }   while ( currRightPartition < numPartitions );
+      }   while (currRightPartition < numPartitions);
 
       probeState = ProbeState.DONE; // last right partition was handled; we are done now
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index ae04579..3f90a3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -147,10 +147,7 @@
       incoming.getContainer().zeroVectors();
       outgoingSv.setRecordCount(0);
       outgoingSv.setBatchActualRecordCount(0);
-      // Must allocate vectors to allow for offset vectors which
-      // require a zero in the 0th position.
-      container.allocateNew();
-      container.setRecordCount(0);
+      container.setEmpty();
       // Release buffer for sv2 (if any)
       if (incomingSv != null) {
         incomingSv.clear();
@@ -162,16 +159,9 @@
       tp.transfer();
     }
 
-    // Must report the actual value count as the record
-    // count. This is NOT the input record count when the input
-    // is an SV2.
-    int inputValueCount = incoming.getContainer().getRecordCount();
-    container.setRecordCount(inputValueCount);
-
     // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
     // batch to output record batch and later an SV2Remover copies the needed records.
     outgoingSv.allocateNew(inputRecordCount);
-    outgoingSv.setBatchActualRecordCount(inputValueCount);
     limit(inputRecordCount);
 
     // Release memory for incoming sv (if any)
@@ -224,7 +214,7 @@
       }
     }
 
-    outgoingSv.setRecordCount(svIndex);
+    setOutgoingRecordCount(inputRecordCount, svIndex);
   }
 
   private void updateOutputSV2(int svIndex, int incomingIndex) {
@@ -243,6 +233,16 @@
     }
   }
 
+  private void setOutgoingRecordCount(int inputRecordCount, int outputCount) {
+    outgoingSv.setRecordCount(outputCount);
+    // Must report the actual value count as the record
+    // count. This is NOT the input record count when the input
+    // is an SV2.
+    int inputValueCount = incoming.getContainer().getRecordCount();
+    container.setRecordCount(inputValueCount);
+    outgoingSv.setBatchActualRecordCount(inputValueCount);
+  }
+
   /**
    * Reset the states for recordStartOffset, numberOfRecords and based on the
    * {@link PartitionLimit} passed to the operator. It also resets the
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 8f16b51..420b61a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -18,19 +18,17 @@
 package org.apache.drill.exec.physical.impl.mergereceiver;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
-import java.util.ArrayList;
 
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
@@ -63,13 +61,8 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.SchemaBuilder;
-import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.testing.ControlsInjector;
@@ -78,20 +71,21 @@
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
-
 import io.netty.buffer.ByteBuf;
 
 /**
  * The MergingRecordBatch merges pre-sorted record batches from remote senders.
  */
 public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(MergingRecordBatch.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(MergingRecordBatch.class);
 
   private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
@@ -99,21 +93,20 @@
   private RecordBatchLoader[] batchLoaders;
   private final RawFragmentBatchProvider[] fragProviders;
   private final ExchangeFragmentContext context;
-  private VectorContainer outgoingContainer;
   private MergingReceiverGeneratorBase merger;
   private final MergingReceiverPOP config;
-  private boolean hasRun = false;
+  private boolean hasRun;
   private boolean outgoingBatchHasSpace = true;
   private boolean hasMoreIncoming = true;
 
-  private int outgoingPosition = 0;
-  private int senderCount = 0;
+  private int outgoingPosition;
+  private int senderCount;
   private RawFragmentBatch[] incomingBatches;
   private int[] batchOffsets;
   private PriorityQueue <Node> pqueue;
   private RawFragmentBatch[] tempBatchHolder;
-  private long[] inputCounts;
-  private long[] outputCounts;
+  private final long[] inputCounts;
+  private final long[] outputCounts;
 
   public enum Metric implements MetricDef {
     BYTES_RECEIVED,
@@ -132,7 +125,6 @@
     super(config, context, true, context.newOperatorContext(config));
     this.fragProviders = fragProviders;
     this.context = context;
-    this.outgoingContainer = new VectorContainer(oContext);
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
     this.inputCounts = new long[config.getNumSenders()];
@@ -343,11 +335,11 @@
         bldr.addField(v.getField());
 
         // allocate a new value vector
-        outgoingContainer.addOrGet(v.getField());
+        container.addOrGet(v.getField());
       }
       allocateOutgoing();
 
-      outgoingContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
       // generate code for merge operations (copy and compare)
       try {
@@ -479,9 +471,7 @@
     }
 
     // set the value counts in the outgoing vectors
-    for (final VectorWrapper<?> vw : outgoingContainer) {
-      vw.getValueVector().getMutator().setValueCount(outgoingPosition);
-    }
+    container.setValueCount(outgoingPosition);
 
     if (pqueue.isEmpty()) {
       state = BatchState.DONE;
@@ -535,8 +525,8 @@
 
   @Override
   public BatchSchema getSchema() {
-    if (outgoingContainer.hasSchema()) {
-      return outgoingContainer.getSchema();
+    if (container.hasSchema()) {
+      return container.getSchema();
     }
     return null;
   }
@@ -567,7 +557,7 @@
         }
         tempBatchHolder[i] = batch;
         for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
-          final ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
+          final ValueVector v = container.addOrGet(MaterializedField.create(field));
           v.allocateNew();
         }
         break;
@@ -575,7 +565,8 @@
     } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-    outgoingContainer.buildSchema(SelectionVectorMode.NONE);
+    container.buildSchema(SelectionVectorMode.NONE);
+    container.setEmpty();
   }
 
   @Override
@@ -644,36 +635,6 @@
     //No op
   }
 
-  @Override
-  public Iterator<VectorWrapper<?>> iterator() {
-    return outgoingContainer.iterator();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public SelectionVector4 getSelectionVector4() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(final SchemaPath path) {
-    return outgoingContainer.getValueVectorId(path);
-  }
-
-  @Override
-  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
-    return outgoingContainer.getValueAccessorById(clazz, ids);
-  }
-
-  @Override
-  public WritableBatch getWritableBatch() {
-    return WritableBatch.get(this);
-  }
-
   private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
     Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
 
@@ -689,7 +650,7 @@
   }
 
   private void allocateOutgoing() {
-    for (final VectorWrapper<?> w : outgoingContainer) {
+    for (final VectorWrapper<?> w : container) {
       final ValueVector v = w.getValueVector();
       if (v instanceof FixedWidthVector) {
         AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
@@ -732,7 +693,7 @@
       g.setMappingSet(MAIN_MAPPING);
       final MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
 
-      merger.doSetup(context, batch, outgoingContainer);
+      merger.doSetup(context, batch, container);
       return merger;
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException(e);
@@ -742,7 +703,7 @@
   private final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
   private final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
   private final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
+  private final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
   private final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
 
   private void generateComparisons(final ClassGenerator<?> g, final VectorAccessible batch) throws SchemaChangeException {
@@ -817,7 +778,7 @@
 
   @Override
   public void close() {
-    outgoingContainer.clear();
+    container.clear();
     if (batchLoaders != null) {
       for (final RecordBatchLoader rbl : batchLoaders) {
         if (rbl != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index e8b522c..c309e8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.FieldReference;
@@ -69,43 +70,45 @@
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
 /**
- * The purpose of this operator is to generate an ordered partition, rather than a random hash partition. This could be
- * used to do a total order sort, for example. This operator reads in a few incoming record batches, samples these
- * batches, and stores them in the distributed cache. The samples from all the parallel-running fragments are merged,
- * and a partition-table is built and stored in the distributed cache for use by all fragments. A new column is added to
- * the outgoing batch, whose value is determined by where each record falls in the partition table. This column is used
- * by PartitionSenderRootExec to determine which bucket to assign each record to.
+ * Generates an ordered partition, rather than a random hash partition. This
+ * could be used to do a total order sort, for example. This operator reads in a
+ * few incoming record batches, samples these batches, and stores them in the
+ * distributed cache. The samples from all the parallel-running fragments are
+ * merged, and a partition-table is built and stored in the distributed cache
+ * for use by all fragments. A new column is added to the outgoing batch, whose
+ * value is determined by where each record falls in the partition table. This
+ * column is used by PartitionSenderRootExec to determine which bucket to assign
+ * each record to.
  */
 public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
 
-//  private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-//  private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
-  public static final CacheConfig<String, CachedVectorContainer> SINGLE_CACHE_CONFIG = CacheConfig //
-      .newBuilder(CachedVectorContainer.class) //
-      .name("SINGLE-" + CachedVectorContainer.class.getSimpleName()) //
-      .mode(SerializationMode.DRILL_SERIALIZIABLE) //
+  public static final CacheConfig<String, CachedVectorContainer> SINGLE_CACHE_CONFIG = CacheConfig
+      .newBuilder(CachedVectorContainer.class)
+      .name("SINGLE-" + CachedVectorContainer.class.getSimpleName())
+      .mode(SerializationMode.DRILL_SERIALIZIABLE)
       .build();
-  public static final CacheConfig<String, CachedVectorContainer> MULTI_CACHE_CONFIG = CacheConfig //
-      .newBuilder(CachedVectorContainer.class) //
-      .name("MULTI-" + CachedVectorContainer.class.getSimpleName()) //
-      .mode(SerializationMode.DRILL_SERIALIZIABLE) //
+  public static final CacheConfig<String, CachedVectorContainer> MULTI_CACHE_CONFIG = CacheConfig
+      .newBuilder(CachedVectorContainer.class)
+      .name("MULTI-" + CachedVectorContainer.class.getSimpleName())
+      .mode(SerializationMode.DRILL_SERIALIZIABLE)
       .build();
 
-  private final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP,
-      ClassGenerator.DEFAULT_SCALAR_MAP);
+  private final MappingSet mainMapping = new MappingSet( (String) null, null,
+      ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
   private final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null,
       ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet partitionMapping = new MappingSet("partitionIndex", null, "partitionVectors", null,
+  private final MappingSet partitionMapping = new MappingSet("partitionIndex",
+      null, "partitionVectors", null,
       ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
 
   private final int recordsToSample; // How many records must be received before analyzing
@@ -115,13 +118,13 @@
   protected final RecordBatch incoming;
   private boolean first = true;
   private OrderedPartitionProjector projector;
-  private VectorContainer partitionVectors = new VectorContainer();
-  private int partitions;
+  private final VectorContainer partitionVectors = new VectorContainer();
+  private final int partitions;
   private Queue<VectorContainer> batchQueue;
   private int recordsSampled;
-  private int sendingMajorFragmentWidth;
-  private boolean startedUnsampledBatches = false;
-  private boolean upstreamNone = false;
+  private final int sendingMajorFragmentWidth;
+  private boolean startedUnsampledBatches;
+  private boolean upstreamNone;
   private int recordCount;
 
   private final IntVector partitionKeyVector;
@@ -142,6 +145,7 @@
 
     DistributedCache cache = null;
     // Clearly, this code is not used!
+    // cache can only be null here!
     this.mmap = cache.getMultiMap(MULTI_CACHE_CONFIG);
     this.tableMap = cache.getMap(SINGLE_CACHE_CONFIG);
     Preconditions.checkNotNull(tableMap);
@@ -161,7 +165,6 @@
     partitionKeyVector.clear();
   }
 
-
   private boolean saveSamples() throws SchemaChangeException, ClassTransformationException, IOException {
     recordsSampled = 0;
     IterOutcome upstream;
@@ -223,10 +226,7 @@
           allocationSize *= 2;
         }
       }
-      for (VectorWrapper<?> vw : containerToCache) {
-        vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
-      }
-      containerToCache.setRecordCount(copier.getOutputRecords());
+      containerToCache.setValueCount(copier.getOutputRecords());
 
       // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
       // into a serializable wrapper object, and then add to distributed map
@@ -251,9 +251,13 @@
   }
 
   /**
-   * Wait until the at least the given timeout is expired or interrupted and the fragment status is not runnable.
-   * @param timeout Timeout in milliseconds.
-   * @return True if the given timeout is expired. False when interrupted and the fragment status is not runnable.
+   * Wait until the at least the given timeout is expired or interrupted and the
+   * fragment status is not runnable.
+   *
+   * @param timeout
+   *          Timeout in milliseconds.
+   * @return True if the given timeout is expired. False when interrupted and
+   *         the fragment status is not runnable.
    */
   private boolean waitUntilTimeOut(final long timeout) {
     while(true) {
@@ -269,11 +273,14 @@
   }
 
   /**
-   * This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At
-   * that point, the records in the batches are sorted and sampled, and the sampled records are stored in the
-   * distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
-   * the samples, sorts all the records, builds a partition table, and attempts to push the partition table to the
-   * distributed cache. Whichever table gets pushed first becomes the table used by all fragments for partitioning.
+   * Called when the first batch comes in. Incoming batches are collected until
+   * a threshold is met. At that point, the records in the batches are sorted
+   * and sampled, and the sampled records are stored in the distributed cache.
+   * Once a sufficient fraction of the fragments have shared their samples, each
+   * fragment grabs all the samples, sorts all the records, builds a partition
+   * table, and attempts to push the partition table to the distributed cache.
+   * Whichever table gets pushed first becomes the table used by all fragments
+   * for partitioning.
    *
    * @return True is successful. False if failed.
    */
@@ -288,8 +295,9 @@
       long val = minorFragmentSampleCount.incrementAndGet();
       logger.debug("Incremented mfsc, got {}", val);
 
-      final long fragmentsBeforeProceed = (long) Math.ceil(sendingMajorFragmentWidth * completionFactor);
-      final String finalTableKey = mapKey + "final";
+      long fragmentsBeforeProceed =
+          (long) Math.ceil(sendingMajorFragmentWidth * completionFactor);
+      String finalTableKey = mapKey + "final";
 
       if (val == fragmentsBeforeProceed) { // we crossed the barrier, build table and get data.
         buildTable();
@@ -326,7 +334,7 @@
         partitionVectors.add(w.getValueVector());
       }
 
-    } catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
+    } catch (ClassTransformationException | IOException | SchemaChangeException ex) {
       kill(false);
       context.getExecutorState().fail(ex);
       return false;
@@ -340,8 +348,8 @@
     // Get all samples from distributed map
 
     SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator());
-    final VectorContainer allSamplesContainer = new VectorContainer();
-    final VectorContainer candidatePartitionTable = new VectorContainer();
+    VectorContainer allSamplesContainer = new VectorContainer();
+    VectorContainer candidatePartitionTable = new VectorContainer();
     CachedVectorContainer wrap = null;
     try {
       for (CachedVectorContainer w : mmap.get(mapKey)) {
@@ -375,9 +383,7 @@
         int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
         if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
           assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
-          for (VectorWrapper<?> vw : candidatePartitionTable) {
-            vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
-          }
+          candidatePartitionTable.setValueCount(copier.getOutputRecords());
           break;
         } else {
           candidatePartitionTable.zeroVectors();
@@ -397,7 +403,6 @@
         wrap.clear();
       }
     }
-
   }
 
   /**
@@ -415,8 +420,8 @@
    */
   private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
       List<Ordering> orderings, List<ValueVector> localAllocationVectors) throws SchemaChangeException {
-    final ErrorCollector collector = new ErrorCollectorImpl();
-    final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getOptions());
+    ErrorCollector collector = new ErrorCollectorImpl();
+    ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getOptions());
     // Note: disabled for now. This may require some debugging:
     // no tests are available for this operator.
     // cg.getCodeGenerator().plainOldJavaCapable(true);
@@ -425,7 +430,7 @@
 
     int i = 0;
     for (Ordering od : orderings) {
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
+      LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
       TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
           .clearMode().setMode(TypeProtos.DataMode.REQUIRED);
       TypeProtos.MajorType newType = builder.build();
@@ -567,31 +572,29 @@
     int recordCount = batch.getRecordCount();
     AllocationHelper.allocate(partitionKeyVector, recordCount, 50);
     projector.projectRecords(recordCount, 0);
-    for (VectorWrapper<?> v : container) {
-      ValueVector.Mutator m = v.getValueVector().getMutator();
-      m.setValueCount(recordCount);
-    }
+    container.setValueCount(recordCount);
   }
 
   /**
-   * Sets up projection that will transfer all of the columns in batch, and also populate the partition column based on
-   * which partition a record falls into in the partition table
+   * Sets up projection that will transfer all of the columns in batch, and also
+   * populate the partition column based on which partition a record falls into
+   * in the partition table
    *
    * @param batch
    * @throws SchemaChangeException
    */
   protected void setupNewSchema(VectorAccessible batch) throws SchemaChangeException {
     container.clear();
-    final ErrorCollector collector = new ErrorCollectorImpl();
-    final List<TransferPair> transfers = Lists.newArrayList();
+    ErrorCollector collector = new ErrorCollectorImpl();
+    List<TransferPair> transfers = Lists.newArrayList();
 
-    final ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot(
+    ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot(
         OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getOptions());
     // Note: disabled for now. This may require some debugging:
     // no tests are available for this operator.
-//    cg.getCodeGenerator().plainOldJavaCapable(true);
+    // cg.getCodeGenerator().plainOldJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    cg.getCodeGenerator().saveCodeForDebugging(true);
+    // cg.getCodeGenerator().saveCodeForDebugging(true);
 
     for (VectorWrapper<?> vw : batch) {
       TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator());
@@ -603,7 +606,7 @@
 
     int count = 0;
     for (Ordering od : popConfig.getOrderings()) {
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+      LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index edbbfe2..a3149b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -51,13 +51,15 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class PartitionerTemplate implements Partitioner {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerTemplate.class);
+  static final Logger logger = LoggerFactory.getLogger(PartitionerTemplate.class);
 
-  // Always keep the recordCount as (2^x) - 1 to better utilize the memory allocation in ValueVectors
+  // Always keep the recordCount as (2^x) - 1 to better utilize the memory
+  // allocation in ValueVectors
   private static final int DEFAULT_RECORD_BATCH_SIZE = (1 << 10) - 1;
 
   private SelectionVector2 sv2;
@@ -68,7 +70,7 @@
   protected FragmentContext context;
   private int start;
   private int end;
-  private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
+  private final List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
 
   private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE;
 
@@ -306,10 +308,14 @@
 
     public void flush(boolean schemaChanged) throws IOException {
       if (dropAll) {
-        // If we are in dropAll mode, we still want to copy the data, because we can't stop copying a single outgoing
-        // batch with out stopping all outgoing batches. Other option is check for status of dropAll before copying
-        // every single record in copy method which has the overhead for every record all the time. Resetting the output
-        // count, reusing the same buffers and copying has overhead only for outgoing batches whose receiver has
+        // If we are in dropAll mode, we still want to copy the data, because we
+        // can't stop copying a single outgoing
+        // batch with out stopping all outgoing batches. Other option is check
+        // for status of dropAll before copying
+        // every single record in copy method which has the overhead for every
+        // record all the time. Resetting the output
+        // count, reusing the same buffers and copying has overhead only for
+        // outgoing batches whose receiver has
         // terminated.
 
         // Reset the count to 0 and use existing buffers for exhausting input where receiver of this batch is terminated
@@ -332,11 +338,7 @@
         return;
       }
 
-      if (recordCount != 0) {
-        for (VectorWrapper<?> w : vectorContainer) {
-          w.getValueVector().getMutator().setValueCount(recordCount);
-        }
-      }
+      vectorContainer.setValueCount(recordCount);
 
       FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLastBatch,
           handle.getQueryId(),
@@ -354,8 +356,10 @@
         stats.stopWait();
       }
 
-      // If the current batch is the last batch, then set a flag to ignore any requests to flush the data
-      // This is possible when the receiver is terminated, but we still get data from input operator
+      // If the current batch is the last batch, then set a flag to ignore any
+      // requests to flush the data
+      // This is possible when the receiver is terminated, but we still get data
+      // from input operator
       if (isLastBatch) {
         dropAll = true;
       }
@@ -421,7 +425,6 @@
       return recordCount;
     }
 
-
     @Override
     public long getTotalRecords() {
       return totalRecords;
@@ -459,6 +462,5 @@
     public void clear(){
       vectorContainer.clear();
     }
-
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 8215fde..674b3ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -886,7 +886,7 @@
 
     doAlloc(0);
     container.buildSchema(SelectionVectorMode.NONE);
-    container.setValueCount(0);
+    container.setEmpty();
     wasNone = true;
     return IterOutcome.OK_NEW_SCHEMA;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 0c11893..dd30c44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -34,6 +34,7 @@
 
   private ImmutableList<TransferPair> transfers;
   private SelectionVector2 vector2;
+  @SuppressWarnings("unused")
   private SelectionVector4 vector4;
   private SelectionVectorMode svMode;
 
@@ -97,8 +98,10 @@
     case TWO_BYTE:
       vector2 = incoming.getSelectionVector2();
       break;
-    default:
+    case NONE:
       break;
+    default:
+      throw new UnsupportedOperationException();
     }
     this.transfers = ImmutableList.copyOf(transfers);
     doSetup(context, incoming, outgoing);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 6a54828..11d307b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -35,34 +35,33 @@
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * A RangePartitionRecordBatch is a run-time operator that provides the ability to divide up the input rows
- * into a fixed number of separate ranges or 'buckets' based on the values of a set of columns (the range
- * partitioning columns).
+ * Provides the ability to divide up the input rows into a fixed number of
+ * separate ranges or 'buckets' based on the values of a set of columns (the
+ * range partitioning columns).
  */
 public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePartitionSender> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangePartitionRecordBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(RangePartitionRecordBatch.class);
 
-  private int numPartitions;
+  private final int numPartitions;
   private int recordCount;
   private final IntVector partitionIdVector;
   private final List<TransferPair> transfers;
 
   public RangePartitionRecordBatch(RangePartitionSender popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(popConfig, context, incoming);
-    this.numPartitions = popConfig.getDestinations().size();
+    numPartitions = popConfig.getDestinations().size();
 
     SchemaPath outputPath = popConfig.getPartitionFunction().getPartitionFieldRef();
     MaterializedField outputField = MaterializedField.create(outputPath.getAsNamePart().getName(), Types.required(TypeProtos.MinorType.INT));
-    this.partitionIdVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator());
-    this.transfers = Lists.newArrayList();
+    partitionIdVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator());
+    transfers = Lists.newArrayList();
   }
 
-
   @Override
   public void close() {
     super.close();
@@ -91,10 +90,7 @@
       partitionIdVector.allocateNew(num);
 
       recordCount = projectRecords(num, 0);
-      for (VectorWrapper<?> v : container) {
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(recordCount);
-      }
+      container.setValueCount(recordCount);
     }
     // returning OK here is fine because the base class AbstractSingleRecordBatch
     // is handling the actual return status; thus if there was a new schema, the
@@ -169,8 +165,8 @@
    * @param firstOutputIndex
    * @return the number of records projected
    */
-  private final int projectRecords(final int recordCount, int firstOutputIndex) {
-    final int countN = recordCount;
+  private final int projectRecords(int recordCount, int firstOutputIndex) {
+    int countN = recordCount;
     int counter = 0;
     for (int i = 0; i < countN; i++, firstOutputIndex++) {
       int partition = getPartition(i);
@@ -183,7 +179,6 @@
     return counter;
   }
 
-
   @Override
   public void dump() {
     logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, recordCount={}, partitionIdVector={}]",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index 970f970..1394938 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -23,8 +23,10 @@
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public abstract class AbstractSV4Copier extends AbstractCopier {
-  // Storing VectorWrapper reference instead of ValueVector[]. With EMIT outcome support underlying operator
-  // operator can generate multiple output batches with no schema changes which will change the ValueVector[]
+  // Storing VectorWrapper reference instead of ValueVector[]. With EMIT outcome
+  // support underlying operator
+  // operator can generate multiple output batches with no schema changes which
+  // will change the ValueVector[]
   // reference but not VectorWrapper reference.
   protected VectorWrapper<?>[] vvIn;
   private SelectionVector4 sv4;
@@ -32,21 +34,18 @@
   @Override
   public void setup(VectorAccessible incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
-    this.sv4 = incoming.getSelectionVector4();
+    sv4 = incoming.getSelectionVector4();
 
-    final int count = outgoing.getNumberOfColumns();
+    int count = outgoing.getNumberOfColumns();
     vvIn = new VectorWrapper[count];
 
-    {
-      int index = 0;
-
-      for (VectorWrapper vectorWrapper: incoming) {
-        vvIn[index] = vectorWrapper;
-        index++;
-      }
+    int index = 0;
+    for (VectorWrapper<?> vectorWrapper: incoming) {
+      vvIn[index++] = vectorWrapper;
     }
   }
 
+  @Override
   public void copyEntryIndirect(int inIndex, int outIndex) {
     copyEntry(sv4.get(inIndex), outIndex);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
index f64a11e..0bb7186 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -42,7 +42,7 @@
     {
       int index = 0;
 
-      for (VectorWrapper vectorWrapper: incoming) {
+      for (VectorWrapper<?> vectorWrapper: incoming) {
         vvIn[index] = vectorWrapper.getValueVector();
         index++;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
index e81eb43..1117ab3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
@@ -27,7 +27,7 @@
 
   public GenericSV4Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
                           SchemaChangeCallBack callBack) {
-    for(VectorWrapper<?> vv : incomingBatch){
+    for (VectorWrapper<?> vv : incomingBatch) {
       ValueVector v = vv.getValueVectors()[0];
       v.makeTransferPair(outputContainer.addOrGet(v.getField(), callBack));
     }
@@ -37,7 +37,7 @@
   public void copyEntry(int inIndex, int outIndex) {
     int inOffset = inIndex & 0xFFFF;
     int inVector = inIndex >>> 16;
-    for ( int i = 0;  i < vvIn.length;  i++ ) {
+    for (int i = 0;  i < vvIn.length;  i++) {
       ValueVector[] vectorsFromIncoming = vvIn[i].getValueVectors();
       vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index d45157a..4471248 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -43,14 +43,18 @@
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    // Don't clear off container just because an OK_NEW_SCHEMA was received from upstream. For cases when there is just
-    // change in container type but no actual schema change, RemovingRecordBatch should consume OK_NEW_SCHEMA and
-    // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container
+    // Don't clear off container just because an OK_NEW_SCHEMA was received from
+    // upstream. For cases when there is just
+    // change in container type but no actual schema change, RemovingRecordBatch
+    // should consume OK_NEW_SCHEMA and
+    // send OK to downstream instead. Since the output of RemovingRecordBatch is
+    // always going to be a regular container
     // change in incoming container type is not actual schema change.
     container.zeroVectors();
     copier = GenericCopierFactory.createAndSetupCopier(incoming, container, callBack);
 
-    // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA
+    // If there is an actual schema change then below condition will be true and
+    // it will send OK_NEW_SCHEMA
     // downstream too
     if (container.isSchemaChanged()) {
       container.buildSchema(SelectionVectorMode.NONE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index d56f848..dca23cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -23,7 +23,6 @@
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.VectorAccessibleSerializable;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Trace;
@@ -38,24 +37,27 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/** TraceRecordBatch contains value vectors which are exactly the same
+/**
+ * Contains value vectors which are exactly the same
  * as the incoming record batch's value vectors. If the incoming
  * record batch has a selection vector (type 2) then TraceRecordBatch
  * will also contain a selection vector.
- *
+ * <p>
  * Purpose of this record batch is to dump the data associated with all
  * the value vectors and selection vector to disk.
- *
+ * <p>
  * This record batch does not modify any data or schema, it simply
  * consumes the incoming record batch's data, dump to disk and pass the
  * same set of value vectors (and selection vectors) to its parent record
- * batch
+ * batch.
  */
 public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(TraceRecordBatch.class);
 
-  private SelectionVector2 sv = null;
+  private SelectionVector2 sv;
 
   private final BufferAllocator localAllocator;
 
@@ -75,13 +77,14 @@
     localAllocator = context.getNewChildAllocator("trace", 200, 0, Long.MAX_VALUE);
     String fileName = getFileName();
 
-    /* Create the log file we will dump to and initialize the file descriptors */
+    // Create the log file we will dump to and initialize the file descriptors
     try {
       Configuration conf = new Configuration();
-      conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
+      conf.set(FileSystem.FS_DEFAULT_NAME_KEY,
+          context.getConfig().getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
       FileSystem fs = FileSystem.get(conf);
 
-      /* create the file */
+      // create the file
       fos = fs.create(new Path(fileName));
     } catch (IOException e) {
         throw new ExecutionSetupException("Unable to create file: " + fileName + " check permissions or if directory exists", e);
@@ -91,15 +94,15 @@
   @Override
   public int getRecordCount() {
     if (sv == null) {
-      return incoming.getRecordCount();
+      return container.getRecordCount();
     } else {
       return sv.getCount();
     }
   }
 
   /**
-   * Function is invoked for every record batch and it simply dumps the buffers associated with all the value vectors in
-   * this record batch to a log file.
+   * Invoked for every record batch and it simply dumps the buffers
+   * associated with all the value vectors in this record batch to a log file.
    */
   @Override
   protected IterOutcome doWork() {
@@ -110,7 +113,7 @@
     } else {
       sv = null;
     }
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2);
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getContainer().getRecordCount(), incoming, incomingHasSv2);
     VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, oContext.getAllocator());
 
     try {
@@ -121,28 +124,31 @@
     batch.reconstructContainer(localAllocator, container);
     if (incomingHasSv2) {
       sv = wrap.getSv2();
+      container.setRecordCount(sv.getBatchActualRecordCount());
+    } else {
+      container.setRecordCount(batch.getDef().getRecordCount());
     }
     return getFinalOutcome(false);
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
-    /* Trace operator does not deal with hyper vectors yet */
+  protected boolean setupNewSchema() {
+    // Trace operator does not deal with hyper vectors yet
     if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
-      throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+      throw new UnsupportedOperationException("Trace operator does not work with hyper vectors");
     }
 
-    /*
-     * we have a new schema, clear our existing container to load the new value vectors
-     */
+    // we have a new schema, clear our existing container to load the new value vectors
+
     container.clear();
 
-    /* Add all the value vectors in the container */
+    // Add all the value vectors in the container
     for (VectorWrapper<?> vv : incoming) {
       TransferPair tp = vv.getValueVector().getTransferPair(oContext.getAllocator());
       container.add(tp.getTo());
     }
     container.buildSchema(incoming.getSchema().getSelectionVectorMode());
+    container.setRecordCount(incoming.getRecordCount());
     return true;
   }
 
@@ -157,12 +163,12 @@
 
   @Override
   public void close() {
-    /* Release the selection vector */
+    // Release the selection vector
     if (sv != null) {
       sv.clear();
     }
 
-    /* Close the file descriptors */
+    // Close the file descriptors
     try {
       fos.close();
     } catch (IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 8c9bb47..25dae80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -338,11 +338,10 @@
     public int getRemainingRecords() {
       return (totalRecordsToProcess - recordsProcessed);
     }
-
   }
 
   private class UnionInputIterator implements Iterator<Pair<IterOutcome, BatchStatusWrappper>> {
-    private final Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
+    private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
 
     UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
       if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index fa8c954..1715c99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -374,8 +374,7 @@
     TransferPair tp = resetUnnestTransferPair();
     container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
     container.buildSchema(SelectionVectorMode.NONE);
-    container.allocateNew();
-    container.setValueCount(0);
+    container.setEmpty();
     return true;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index a6ebef0..d40bd6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -51,9 +51,11 @@
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class UnorderedReceiverBatch implements CloseableRecordBatch {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(UnorderedReceiverBatch.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(UnorderedReceiverBatch.class);
 
   private final RecordBatchLoader batchLoader;
@@ -78,11 +80,15 @@
     }
   }
 
-  public UnorderedReceiverBatch(final ExchangeFragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException {
+  public UnorderedReceiverBatch(ExchangeFragmentContext context,
+      RawFragmentBatchProvider fragProvider, UnorderedReceiver config)
+          throws OutOfMemoryException {
     this.fragProvider = fragProvider;
     this.context = context;
-    // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
-    // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
+    // In normal case, batchLoader does not require an allocator. However, in
+    // case of splitAndTransfer of a value vector,
+    // we may need an allocator for the new offset vector. Therefore, here we
+    // pass the context's allocator to batchLoader.
     oContext = context.newOperatorContext(config);
     this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
 
@@ -90,8 +96,10 @@
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
 
-    // Register this operator's buffer allocator so that incoming buffers are owned by this allocator
-    context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
+    // Register this operator's buffer allocator so that incoming buffers are
+    // owned by this allocator
+    context.getBuffers().getCollector(config.getOppositeMajorFragmentId())
+      .setAllocator(oContext.getAllocator());
   }
 
   @Override
@@ -110,7 +118,7 @@
   }
 
   @Override
-  public void kill(final boolean sendUpstream) {
+  public void kill(boolean sendUpstream) {
     if (sendUpstream) {
       informSenders();
     }
@@ -133,12 +141,12 @@
   }
 
   @Override
-  public TypedFieldId getValueVectorId(final SchemaPath path) {
+  public TypedFieldId getValueVectorId(SchemaPath path) {
     return batchLoader.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
+  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
     return batchLoader.getValueAccessorById(clazz, ids);
   }
 
@@ -146,8 +154,9 @@
     try {
       injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
       return fragProvider.getNext();
-    } catch(final InterruptedException e) {
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+    } catch(InterruptedException e) {
+      // Preserve evidence that the interruption occurred so that code higher up
+      // on the call stack can learn of the
       // interruption and respond to it if it wants to.
       Thread.currentThread().interrupt();
 
@@ -190,8 +199,8 @@
         return lastOutcome;
       }
 
-      final RecordBatchDef rbd = batch.getHeader().getDef();
-      final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+      RecordBatchDef rbd = batch.getHeader().getDef();
+      boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
       // SchemaChangeException, so check/clean catch clause below.
       stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
@@ -230,7 +239,9 @@
 
   @Override
   public VectorContainer getOutgoingContainer() {
-    throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+    throw new UnsupportedOperationException(
+        String.format("You should not call getOutgoingContainer() for class %s",
+            getClass().getCanonicalName()));
   }
 
   @Override
@@ -240,15 +251,15 @@
 
   private void informSenders() {
     logger.info("Informing senders of request to terminate sending.");
-    final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+    FragmentHandle handlePrototype = FragmentHandle.newBuilder()
             .setMajorFragmentId(config.getOppositeMajorFragmentId())
             .setQueryId(context.getHandle().getQueryId())
             .build();
-    for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
-      final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+    for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+      FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
               .setMinorFragmentId(providingEndpoint.getId())
               .build();
-      final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+      FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
@@ -262,19 +273,19 @@
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
-    public void failed(final RpcException ex) {
+    public void failed(RpcException ex) {
       logger.warn("Failed to inform upstream that receiver is finished");
     }
 
     @Override
-    public void success(final Ack value, final ByteBuf buffer) {
+    public void success(Ack value, ByteBuf buffer) {
       // Do nothing
     }
 
     @Override
-    public void interrupted(final InterruptedException e) {
+    public void interrupted(InterruptedException e) {
       if (context.getExecutorState().shouldContinue()) {
-        final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+        String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
         logger.error(errMsg, e);
         context.getExecutorState().fail(new RpcException(errMsg, e));
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 0f3517f..99bd6d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -31,12 +31,15 @@
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Unpivot maps. Assumptions are:
@@ -44,7 +47,7 @@
  *  2) Each map contains the same number of fields and field names are also same (types could be different).
  *
  * Example input and output:
- * Schema of input:
+ * Schema of input: <pre>
  *    "schema"        : BIGINT - Schema number. For each schema change this number is incremented.
  *    "computed"      : BIGINT - What time is it computed?
  *    "columns" : MAP - Column names
@@ -69,22 +72,23 @@
  *  "statscount"       : BIGINT
  *  "nonnullstatcount" : BIGINT
  *  .... one column for each map type ...
+ *  </pre>
  */
 public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMaps> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnpivotMapsRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(UnpivotMapsRecordBatch.class);
 
   private final List<String> mapFieldsNames;
   private boolean first = true;
-  private int keyIndex = 0;
-  private List<String> keyList = null;
+  private int keyIndex;
+  private List<String> keyList;
 
-  private Map<MaterializedField, Map<String, ValueVector>> dataSrcVecMap = null;
+  private Map<MaterializedField, Map<String, ValueVector>> dataSrcVecMap;
 
   // Map of non-map fields to VV in the incoming schema
-  private Map<MaterializedField, ValueVector> copySrcVecMap = null;
+  private Map<MaterializedField, ValueVector> copySrcVecMap;
 
   private List<TransferPair> transferList;
-  private int recordCount = 0;
+  private int recordCount;
 
   public UnpivotMapsRecordBatch(UnpivotMaps pop, RecordBatch incoming, FragmentContext context)
       throws OutOfMemoryException {
@@ -150,6 +154,7 @@
     }
   }
 
+  @Override
   public VectorContainer getOutgoingContainer() {
     return this.container;
   }
@@ -170,11 +175,10 @@
 
     keyIndex = (keyIndex + 1) % keyList.size();
     recordCount = outRecordCount;
+    container.setRecordCount(recordCount);
 
     if (keyIndex == 0) {
-      for (VectorWrapper w : incoming) {
-        w.clear();
-      }
+      VectorAccessibleUtilities.clear(incoming.getContainer());
     }
     return IterOutcome.OK;
   }
@@ -270,6 +274,7 @@
     container.clear();
     buildKeyList();
     buildOutputContainer();
+    container.setEmpty();
     return true;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 5c4f8b4..39189b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -22,19 +22,31 @@
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
 import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
 import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
 import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
 import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
 import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch;
 import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
 import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
+import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
+import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch;
 import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.physical.impl.trace.TraceRecordBatch;
+import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
 import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
+import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
+import org.apache.drill.exec.physical.impl.unpivot.UnpivotMapsRecordBatch;
+import org.apache.drill.exec.physical.impl.window.WindowFrameRecordBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
@@ -223,6 +235,18 @@
     rules.put(MergeJoinBatch.class, CheckMode.VECTORS);
     rules.put(NestedLoopJoinBatch.class, CheckMode.VECTORS);
     rules.put(LimitRecordBatch.class, CheckMode.VECTORS);
+    rules.put(MergingRecordBatch.class, CheckMode.VECTORS);
+    rules.put(OrderedPartitionRecordBatch.class, CheckMode.VECTORS);
+    rules.put(RangePartitionRecordBatch.class, CheckMode.VECTORS);
+    rules.put(TraceRecordBatch.class, CheckMode.VECTORS);
+    rules.put(UnionAllRecordBatch.class, CheckMode.VECTORS);
+    rules.put(UnorderedReceiverBatch.class, CheckMode.VECTORS);
+    rules.put(UnpivotMapsRecordBatch.class, CheckMode.VECTORS);
+    rules.put(WindowFrameRecordBatch.class, CheckMode.VECTORS);
+    rules.put(TopNBatch.class, CheckMode.VECTORS);
+    rules.put(HashJoinBatch.class, CheckMode.VECTORS);
+    rules.put(ExternalSortBatch.class, CheckMode.VECTORS);
+    rules.put(WriterRecordBatch.class, CheckMode.VECTORS);
     return rules;
   }
 
@@ -518,9 +542,13 @@
       // empty map vector that causes validation to fail.
       ValueVector child = internalMap.getChild(type.name());
       if (child == null) {
-        error(name, vector, String.format(
-            "Union vector includes type %s, but the internal map has no matching member",
-            type.name()));
+        // Disabling this check for now. TopNBatch, SortBatch
+        // and perhaps others will create vectors with a set of
+        // types, but won't actually populate some of the types.
+        //
+        // error(name, vector, String.format(
+        //     "Union vector includes type %s, but the internal map has no matching member",
+        //     type.name()));
       } else {
         validateVector(name + "-type-" + type.name(),
             valueCount, child);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index cc7a04d..a759399 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -26,18 +26,22 @@
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Named;
 import java.util.List;
 
 
 /**
- * WindowFramer implementation that doesn't support the FRAME clause (will assume the default frame).
- * <br>According to the SQL standard, LEAD, LAG, ROW_NUMBER, NTILE and all ranking functions don't support the FRAME clause.
- * This class will handle such functions.
+ * WindowFramer implementation that doesn't support the FRAME clause (will
+ * assume the default frame). <p>
+ * According to the SQL standard, LEAD, LAG, ROW_NUMBER, NTILE and all ranking
+ * functions don't support the FRAME clause. This class will handle such
+ * functions.
  */
 public abstract class NoFrameSupportTemplate implements WindowFramer {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoFrameSupportTemplate.class);
+  private static final Logger logger = LoggerFactory.getLogger(NoFrameSupportTemplate.class);
 
   private VectorContainer container;
   private VectorContainer internal;
@@ -53,8 +57,8 @@
   private Partition partition; // current partition being processed
 
   @Override
-  public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext,
-                    final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException {
+  public void setup(List<WindowDataBatch> batches, VectorContainer container, OperatorContext oContext,
+                    boolean requireFullPartition, WindowPOP popConfig) throws SchemaChangeException {
     this.container = container;
     this.batches = batches;
 
@@ -81,9 +85,7 @@
   @Override
   public void doWork() throws DrillException {
     int currentRow = 0;
-
-    this.current = batches.get(0);
-
+    current = batches.get(0);
     outputCount = current.getRecordCount();
 
     while (currentRow < outputCount) {
@@ -108,10 +110,9 @@
     }
   }
 
-  private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
+  private void newPartition(WindowDataBatch current, int currentRow) throws SchemaChangeException {
     partition = new Partition();
     updatePartitionSize(partition, currentRow);
-
     setupPartition(current, container);
   }
 
@@ -131,16 +132,19 @@
   }
 
   /**
-   * process all rows (computes and writes function values) of current batch that are part of current partition.
-   * @param currentRow first unprocessed row
+   * Process all rows (computes and writes function values) of current batch
+   * that are part of current partition.
+   *
+   * @param currentRow
+   *          first unprocessed row
    * @return index of next unprocessed row
-   * @throws DrillException if it can't write into the container
+   * @throws DrillException
+   *           if it can't write into the container
    */
-  private int processPartition(final int currentRow) throws DrillException {
+  private int processPartition(int currentRow) throws DrillException {
     logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
 
     setupCopyNext(current, container);
-
     copyPrevFromInternal();
 
     // copy remaining from current
@@ -198,15 +202,14 @@
     }
   }
 
-  private void processRow(final int row) throws DrillException {
+  private void processRow(int row) throws DrillException {
     if (partition.isFrameDone()) {
       // because all peer rows share the same frame, we only need to compute and aggregate the frame once
-      final long peers = countPeers(row);
+      long peers = countPeers(row);
       partition.newFrame(peers);
     }
 
     outputRow(row, partition);
-
     partition.rowAggregated();
   }
 
@@ -214,7 +217,7 @@
    * updates partition's length after computing the number of rows for the current the partition starting at the specified
    * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
    */
-  private void updatePartitionSize(final Partition partition, final int start) {
+  private void updatePartitionSize(Partition partition, int start) {
     logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
 
     long length = 0;
@@ -226,7 +229,7 @@
 
     outer:
     for (WindowDataBatch batch : batches) {
-      final int recordCount = batch.getRecordCount();
+      int recordCount = batch.getRecordCount();
 
       // check first container from start row, and subsequent containers from first row
       for (; row < recordCount; row++, length++) {
@@ -262,18 +265,18 @@
   }
 
   /**
-   * count number of peer rows for current row
+   * Count number of peer rows for current row
    * @param start starting row of the current frame
    * @return num peer rows for current row
    * @throws SchemaChangeException
    */
-  private long countPeers(final int start) throws SchemaChangeException {
+  private long countPeers(int start) throws SchemaChangeException {
     long length = 0;
 
     // a single frame can include rows from multiple batches
     // start processing first batch and, if necessary, move to next batches
     for (WindowDataBatch batch : batches) {
-      final int recordCount = batch.getRecordCount();
+      int recordCount = batch.getRecordCount();
 
       // for every remaining row in the partition, count it if it's a peer row
       for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
@@ -308,12 +311,14 @@
         + "]";
   }
 
-
   /**
-   * called once for each row after we evaluate all peer rows. Used to write a value in the row
+   * Called once for each row after we evaluate all peer rows. Used to write a
+   * value in the row
    *
-   * @param outIndex index of row
-   * @param partition object used by "computed" window functions
+   * @param outIndex
+   *          index of row
+   * @param partition
+   *          object used by "computed" window functions
    */
   public abstract void outputRow(@Named("outIndex") int outIndex,
                                  @Named("partition") Partition partition)
@@ -331,10 +336,13 @@
                        throws SchemaChangeException;
 
   /**
-   * copies value(s) from inIndex row to outIndex row. Mostly used by LEAD. inIndex always points to the row next to
-   * outIndex
-   * @param inIndex source row of the copy
-   * @param outIndex destination row of the copy.
+   * Copies value(s) from inIndex row to outIndex row. Mostly used by LEAD.
+   * inIndex always points to the row next to outIndex
+   *
+   * @param inIndex
+   *          source row of the copy
+   * @param outIndex
+   *          destination row of the copy.
    */
   public abstract void copyNext(@Named("inIndex") int inIndex,
                                 @Named("outIndex") int outIndex)
@@ -344,10 +352,13 @@
                        throws SchemaChangeException;
 
   /**
-   * copies value(s) from inIndex row to outIndex row. Mostly used by LAG. inIndex always points to the previous row
+   * Copies value(s) from inIndex row to outIndex row. Mostly used by LAG.
+   * inIndex always points to the previous row
    *
-   * @param inIndex source row of the copy
-   * @param outIndex destination row of the copy.
+   * @param inIndex
+   *          source row of the copy
+   * @param outIndex
+   *          destination row of the copy.
    */
   public abstract void copyPrev(@Named("inIndex") int inIndex,
                                 @Named("outIndex") int outIndex)
@@ -364,12 +375,12 @@
                        throws SchemaChangeException;
 
   /**
-   * reset all window functions
+   * Reset all window functions
    */
   public abstract boolean resetValues() throws SchemaChangeException;
 
   /**
-   * compares two rows from different batches (can be the same), if they have the same value for the partition by
+   * Compares two rows from different batches (can be the same), if they have the same value for the partition by
    * expression
    * @param b1Index index of first row
    * @param b1 batch for first row
@@ -385,7 +396,7 @@
                           throws SchemaChangeException;
 
   /**
-   * compares two rows from different batches (can be the same), if they have the same value for the order by
+   * Compares two rows from different batches (can be the same), if they have the same value for the order by
    * expression
    * @param b1Index index of first row
    * @param b1 batch for first row
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
index 55fa647..a7d9a8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
@@ -17,13 +17,17 @@
  */
 package org.apache.drill.exec.physical.impl.window;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * Used internally to keep track of partitions and frames.<br>
- * A partition can be partial, which means we don't know "yet" the total number of records that are part of this partition.
- * Even for partial partitions, we know the number of rows that are part of current frame
+ * Used internally to keep track of partitions and frames.<p>
+ * A partition can be partial, which means we don't know "yet" the total number
+ * of records that are part of this partition. Even for partial partitions, we
+ * know the number of rows that are part of current frame
  */
 public class Partition {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Partition.class);
+  private static final Logger logger = LoggerFactory.getLogger(Partition.class);
 
   private boolean partial; // true if we don't know yet the full length of this partition
   private long length; // size of this partition (if partial is true, then this is a partial length of the partition)
@@ -48,10 +52,13 @@
   public long getLength() {
     return length;
   }
+
   /**
-   * @param length number of rows in this partition
-   * @param partial if true, then length is not the full length of the partition but just the number of rows in the
-   *                current batch
+   * @param length
+   *          number of rows in this partition
+   * @param partial
+   *          if true, then length is not the full length of the partition but
+   *          just the number of rows in the current batch
    */
   public void updateLength(long length, boolean partial) {
     this.length += length;
@@ -62,7 +69,6 @@
   public void rowAggregated() {
     remaining--;
     peers--;
-
     row_number++;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 59e84ef..6ed004f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -21,8 +21,6 @@
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FunctionCall;
@@ -47,23 +45,25 @@
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JExpr;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.codemodel.JExpr;
 
 /**
  * support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...])
  *
  */
 public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameRecordBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(WindowFrameRecordBatch.class);
 
   private final RecordBatch incoming;
   private List<WindowDataBatch> batches;
 
   private WindowFramer[] framers;
-  private boolean hasOrderBy; // true if window definition contains an order-by clause
   private final List<WindowFunction> functions = Lists.newArrayList();
 
   private boolean noMoreBatches; // true when downstream returns NONE
@@ -71,14 +71,16 @@
 
   private boolean shouldStop; // true if we received an early termination request
 
-  public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+  public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context,
+      RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
     batches = Lists.newArrayList();
   }
 
   /**
-   * Hold incoming batches in memory until all window functions are ready to process the batch on top of the queue
+   * Hold incoming batches in memory until all window functions are ready to
+   * process the batch on top of the queue
    */
   @Override
   public IterOutcome innerNext() {
@@ -104,7 +106,8 @@
       return IterOutcome.NONE;
     }
 
-    // keep saving incoming batches until the first unprocessed batch can be processed, or upstream == NONE
+    // keep saving incoming batches until the first unprocessed batch can be
+    // processed, or upstream == NONE
     while (!noMoreBatches && !canDoWork()) {
       IterOutcome upstream = next(incoming);
       logger.trace("next(incoming) returned {}", upstream);
@@ -124,7 +127,7 @@
             if (schema != null) {
               throw new UnsupportedOperationException("OVER clause doesn't currently support changing schemas.");
             }
-            this.schema = incoming.getSchema();
+            schema = incoming.getSchema();
           }
         case OK:
           if (incoming.getRecordCount() > 0) {
@@ -160,15 +163,13 @@
 
   private void doWork() throws DrillException {
 
-    final WindowDataBatch current = batches.get(0);
-    final int recordCount = current.getRecordCount();
+    WindowDataBatch current = batches.get(0);
+    int recordCount = current.getRecordCount();
 
     logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows", batches.size(), recordCount);
 
     // allocate outgoing vectors
-    for (VectorWrapper<?> w : container) {
-      w.getValueVector().allocateNew();
-    }
+    container.allocateNew();
 
     for (WindowFramer framer : framers) {
       framer.doWork();
@@ -181,10 +182,7 @@
       tp.transfer();
     }
 
-    container.setRecordCount(recordCount);
-    for (VectorWrapper<?> v : container) {
-      v.getValueVector().getMutator().setValueCount(recordCount);
-    }
+    container.setValueCount(recordCount);
 
     // we can safely free the current batch
     current.clear();
@@ -194,8 +192,8 @@
   }
 
   /**
-   * @return true when all window functions are ready to process the current batch (it's the first batch currently
-   * held in memory)
+   * @return true when all window functions are ready to process the current
+   *         batch (it's the first batch currently held in memory)
    */
   private boolean canDoWork() {
     if (batches.size() < 2) {
@@ -204,10 +202,10 @@
       return false;
     }
 
-    final VectorAccessible current = batches.get(0);
-    final int currentSize = current.getRecordCount();
-    final VectorAccessible last = batches.get(batches.size() - 1);
-    final int lastSize = last.getRecordCount();
+    VectorAccessible current = batches.get(0);
+    int currentSize = current.getRecordCount();
+    VectorAccessible last = batches.get(batches.size() - 1);
+    int lastSize = last.getRecordCount();
 
     boolean partitionEndReached;
     boolean frameEndReached;
@@ -215,7 +213,7 @@
       partitionEndReached = !framers[0].isSamePartition(currentSize - 1, current, lastSize - 1, last);
       frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last);
 
-      for (final WindowFunction function : functions) {
+      for (WindowFunction function : functions) {
         if (!function.canDoWork(batches.size(), popConfig, frameEndReached, partitionEndReached)) {
           return false;
         }
@@ -230,18 +228,20 @@
   @Override
   protected void buildSchema() throws SchemaChangeException {
     logger.trace("buildSchema()");
-    final IterOutcome outcome = next(incoming);
+    IterOutcome outcome = next(incoming);
     switch (outcome) {
-      case NONE:
-        state = BatchState.DONE;
-        container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-        return;
-      case STOP:
-        state = BatchState.STOP;
-        return;
-      case OUT_OF_MEMORY:
-        state = BatchState.OUT_OF_MEMORY;
-        return;
+    case NONE:
+      state = BatchState.DONE;
+      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      return;
+    case STOP:
+      state = BatchState.STOP;
+      return;
+    case OUT_OF_MEMORY:
+      state = BatchState.OUT_OF_MEMORY;
+      return;
+    default:
+      break;
     }
 
     try {
@@ -260,30 +260,28 @@
 
     logger.trace("creating framer(s)");
 
-    final List<LogicalExpression> keyExprs = Lists.newArrayList();
-    final List<LogicalExpression> orderExprs = Lists.newArrayList();
+    List<LogicalExpression> keyExprs = Lists.newArrayList();
+    List<LogicalExpression> orderExprs = Lists.newArrayList();
     boolean requireFullPartition = false;
 
     boolean useDefaultFrame = false; // at least one window function uses the DefaultFrameTemplate
     boolean useCustomFrame = false; // at least one window function uses the CustomFrameTemplate
 
-    hasOrderBy = popConfig.getOrderings().size() > 0;
-
     // all existing vectors will be transferred to the outgoing container in framer.doWork()
-    for (final VectorWrapper<?> wrapper : batch) {
+    for (VectorWrapper<?> wrapper : batch) {
       container.addOrGet(wrapper.getField());
     }
 
     // add aggregation vectors to the container, and materialize corresponding expressions
-    for (final NamedExpression ne : popConfig.getAggregations()) {
+    for (NamedExpression ne : popConfig.getAggregations()) {
       if (!(ne.getExpr() instanceof FunctionCall)) {
         throw UserException.functionError()
           .message("Unsupported window function '%s'", ne.getExpr())
           .build(logger);
       }
 
-      final FunctionCall call = (FunctionCall) ne.getExpr();
-      final WindowFunction winfun = WindowFunction.fromExpression(call);
+      FunctionCall call = (FunctionCall) ne.getExpr();
+      WindowFunction winfun = WindowFunction.fromExpression(call);
       if (winfun.materialize(ne, container, context.getFunctionRegistry())) {
         functions.add(winfun);
         requireFullPartition |= winfun.requiresFullPartition(popConfig);
@@ -300,12 +298,12 @@
     container.setRecordCount(0);
 
     // materialize partition by expressions
-    for (final NamedExpression ne : popConfig.getWithins()) {
+    for (NamedExpression ne : popConfig.getWithins()) {
       keyExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(ne.getExpr(), batch, context.getFunctionRegistry()));
     }
 
     // materialize order by expressions
-    for (final Order.Ordering oe : popConfig.getOrderings()) {
+    for (Order.Ordering oe : popConfig.getOrderings()) {
       orderExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(oe.getExpr(), batch, context.getFunctionRegistry()));
     }
 
@@ -328,31 +326,31 @@
     }
   }
 
-  private WindowFramer generateFramer(final List<LogicalExpression> keyExprs, final List<LogicalExpression> orderExprs,
-      final List<WindowFunction> functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
+  private WindowFramer generateFramer(List<LogicalExpression> keyExprs, List<LogicalExpression> orderExprs,
+      List<WindowFunction> functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
 
     TemplateClassDefinition<WindowFramer> definition = useCustomFrame ?
       WindowFramer.FRAME_TEMPLATE_DEFINITION : WindowFramer.NOFRAME_TEMPLATE_DEFINITION;
-    final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(definition, context.getOptions());
+    ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(definition, context.getOptions());
 
     {
       // generating framer.isSamePartition()
-      final GeneratorMapping IS_SAME_PARTITION_READ = GeneratorMapping.create("isSamePartition", "isSamePartition", null, null);
-      final MappingSet isaB1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
-      final MappingSet isaB2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
+      GeneratorMapping IS_SAME_PARTITION_READ = GeneratorMapping.create("isSamePartition", "isSamePartition", null, null);
+      MappingSet isaB1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
+      MappingSet isaB2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
       setupIsFunction(cg, keyExprs, isaB1, isaB2);
     }
 
     {
       // generating framer.isPeer()
-      final GeneratorMapping IS_SAME_PEER_READ = GeneratorMapping.create("isPeer", "isPeer", null, null);
-      final MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
-      final MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
+      GeneratorMapping IS_SAME_PEER_READ = GeneratorMapping.create("isPeer", "isPeer", null, null);
+      MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
+      MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
       // isPeer also checks if it's the same partition
       setupIsFunction(cg, Iterables.concat(keyExprs, orderExprs), isaP1, isaP2);
     }
 
-    for (final WindowFunction function : functions) {
+    for (WindowFunction function : functions) {
       // only generate code for the proper window functions
       if (function.supportsCustomFrames() == useCustomFrame) {
         function.generateCode(cg);
@@ -363,7 +361,7 @@
     CodeGenerator<WindowFramer> codeGen = cg.getCodeGenerator();
     codeGen.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    codeGen.saveCodeForDebugging(true);
+    codeGen.saveCodeForDebugging(true);
 
     return context.getImplementationClass(codeGen);
   }
@@ -371,8 +369,8 @@
   /**
    * setup comparison functions isSamePartition and isPeer
    */
-  private void setupIsFunction(final ClassGenerator<WindowFramer> cg, final Iterable<LogicalExpression> exprs,
-                               final MappingSet leftMapping, final MappingSet rightMapping) {
+  private void setupIsFunction(ClassGenerator<WindowFramer> cg, Iterable<LogicalExpression> exprs,
+                               MappingSet leftMapping, MappingSet rightMapping) {
     cg.setMappingSet(leftMapping);
     for (LogicalExpression expr : exprs) {
       if (expr == null) {
@@ -384,10 +382,10 @@
       cg.setMappingSet(rightMapping);
       ClassGenerator.HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
 
-      final LogicalExpression fh =
+      LogicalExpression fh =
         FunctionGenerationHelper
           .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
-      final ClassGenerator.HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+      ClassGenerator.HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
       cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
     cg.getEvalBlock()._return(JExpr.TRUE);
@@ -404,7 +402,7 @@
     }
 
     if (batches != null) {
-      for (final WindowDataBatch bd : batches) {
+      for (WindowDataBatch bd : batches) {
         bd.clear();
       }
       batches = null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index a7964d6..4bb3d38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -29,11 +29,14 @@
 import java.util.List;
 
 public interface WindowFramer {
-  TemplateClassDefinition<WindowFramer> NOFRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, NoFrameSupportTemplate.class);
-  TemplateClassDefinition<WindowFramer> FRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class);
+  TemplateClassDefinition<WindowFramer> NOFRAME_TEMPLATE_DEFINITION =
+      new TemplateClassDefinition<>(WindowFramer.class, NoFrameSupportTemplate.class);
+  TemplateClassDefinition<WindowFramer> FRAME_TEMPLATE_DEFINITION =
+      new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class);
 
-  void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext operatorContext,
-             final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException;
+  void setup(final List<WindowDataBatch> batches, final VectorContainer container,
+      final OperatorContext operatorContext, final boolean requireFullPartition,
+      final WindowPOP popConfig) throws SchemaChangeException;
 
   /**
    * process the inner batch and write the aggregated values in the container
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index 9587325..3006529 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -28,23 +28,23 @@
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.AllocationHelper;
+
 
 public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
 
   private SelectionVector4 vector4;
   private List<BatchGroup> batchGroups;
   private VectorAccessible hyperBatch;
   private VectorAccessible outgoing;
   private int size;
-  private int queueSize = 0;
+  private int queueSize;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
-                    VectorAccessible outgoing) throws SchemaChangeException {
+  public void setup(FragmentContext context, BufferAllocator allocator,
+      VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+      VectorAccessible outgoing) throws SchemaChangeException {
     this.hyperBatch = hyperBatch;
     this.batchGroups = batchGroups;
     this.outgoing = outgoing;
@@ -90,20 +90,14 @@
   }
 
   private void setValueCount(int count) {
-    for (VectorWrapper w: outgoing) {
-      w.getValueVector().getMutator().setValueCount(count);
-    }
+    VectorAccessibleUtilities.setValueCount(outgoing, count);
   }
 
   @Override
   public void close() throws IOException {
     vector4.clear();
-    for (final VectorWrapper<?> w: outgoing) {
-      w.getValueVector().clear();
-    }
-    for (final VectorWrapper<?> w : hyperBatch) {
-      w.clear();
-    }
+    VectorAccessibleUtilities.clear(outgoing);
+    VectorAccessibleUtilities.clear(hyperBatch);
 
     for (BatchGroup batchGroup : batchGroups) {
       batchGroup.close();
@@ -123,9 +117,7 @@
   }
 
   private void allocateVectors(int targetRecordCount) {
-    for (VectorWrapper w: outgoing) {
-      AllocationHelper.allocateNew(w.getValueVector(), targetRecordCount);
-    }
+    VectorAccessibleUtilities.allocateVectors(outgoing, targetRecordCount);
   }
 
   private void siftDown() {
@@ -162,8 +154,8 @@
     return doEval(sv1, sv2);
   }
 
-  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+  public abstract void doSetup(@Named("context") FragmentContext context,
+      @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
   public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 96b9cee..cb835e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -22,14 +22,17 @@
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
 /**
- * Implements an AbstractUnaryRecordBatch where the incoming record batch is known at the time of creation
+ * Implements an AbstractUnaryRecordBatch where the incoming record batch is
+ * known at the time of creation
+ *
  * @param <T>
  */
 public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractUnaryRecordBatch<T> {
 
   protected final RecordBatch incoming;
 
-  public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+  public AbstractSingleRecordBatch(T popConfig, FragmentContext context,
+      RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index a7cb01f..808f6ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -23,6 +23,8 @@
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The base class for operators that have a single input. The concrete implementations provide the
@@ -34,9 +36,9 @@
  * @param <T>
  */
 public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
+  private static final Logger logger = LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
 
-  protected boolean outOfMemory = false;
+  protected boolean outOfMemory;
   protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private IterOutcome lastKnownOutcome;
 
@@ -140,18 +142,17 @@
   protected abstract IterOutcome doWork();
 
   /**
-   * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE before return a OK_NEW_SCHEMA:
-   * This could happen when the underneath Scan operators do not produce any batch with schema.
-   *
+   * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE
+   * before return a OK_NEW_SCHEMA: This could happen when the underneath Scan
+   * operators do not produce any batch with schema.
    * <p>
-   * Notice that NULL input is different from input with an empty batch. In the later case, input provides
-   * at least a batch, thought it's empty.
-   *</p>
-   *
+   * Notice that NULL input is different from input with an empty batch. In the
+   * later case, input provides at least a batch, thought it's empty.
+   * </p>
    * <p>
-   * This behavior could be override in each individual operator, if the operator's semantics is to
-   * inject a batch with schema.
-   *</p>
+   * This behavior could be override in each individual operator, if the
+   * operator's semantics is to inject a batch with schema.
+   * </p>
    *
    * @return IterOutcome.NONE.
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 8361dbe..1f06710 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -89,18 +89,18 @@
     // the schema has changed since the previous call.
 
     // Set up to recognize previous fields that no longer exist.
-    final Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap();
-    for (final VectorWrapper<?> wrapper : container) {
-      final ValueVector vector = wrapper.getValueVector();
+    Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap();
+    for (VectorWrapper<?> wrapper : container) {
+      ValueVector vector = wrapper.getValueVector();
       oldFields.put(vector.getField().getName(), vector);
     }
 
-    final VectorContainer newVectors = new VectorContainer();
+    VectorContainer newVectors = new VectorContainer();
     try {
-      final List<SerializedField> fields = def.getFieldList();
+      List<SerializedField> fields = def.getFieldList();
       int bufOffset = 0;
-      for (final SerializedField field : fields) {
-        final MaterializedField fieldDef = MaterializedField.create(field);
+      for (SerializedField field : fields) {
+        MaterializedField fieldDef = MaterializedField.create(field);
         ValueVector vector = oldFields.remove(fieldDef.getName());
 
         if (vector == null) {
@@ -144,8 +144,8 @@
       }
 
       // rebuild the schema.
-      final SchemaBuilder builder = BatchSchema.newBuilder();
-      for (final VectorWrapper<?> v : newVectors) {
+      SchemaBuilder builder = BatchSchema.newBuilder();
+      for (VectorWrapper<?> v : newVectors) {
         builder.addField(v.getField());
       }
       builder.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -156,14 +156,12 @@
     } catch (final Throwable cause) {
       // We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should
       // adjudicate to call upper layer specific clean up logic.
-      for (final VectorWrapper<?> wrapper:newVectors) {
-        wrapper.getValueVector().clear();
-      }
+      VectorAccessibleUtilities.clear(newVectors);
       throw cause;
     } finally {
       if (! oldFields.isEmpty()) {
         schemaChanged = true;
-        for (final ValueVector vector : oldFields.values()) {
+        for (ValueVector vector : oldFields.values()) {
           vector.clear();
         }
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 04cf32e..e03b17c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -36,7 +36,6 @@
  * RecordIterator will hold onto multiple record batches in order to support resetting beyond record batch boundary.
  */
 public class RecordIterator implements VectorAccessible {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordIterator.class);
 
   private final RecordBatch incoming;
   private final AbstractRecordBatch<?> outgoing;
@@ -48,10 +47,10 @@
   private int markedInnerPosition;
   private long markedOuterPosition;
   private IterOutcome lastOutcome;
-  private int inputIndex;           // For two way merge join 0:left, 1:right
+  private final int inputIndex;           // For two way merge join 0:left, 1:right
   private boolean lastBatchRead;    // True if all batches are consumed.
   private boolean initialized;
-  private OperatorContext oContext;
+  private final OperatorContext oContext;
   private final boolean enableMarkAndReset;
 
   private final VectorContainer container; // Holds VectorContainer of current record batch
@@ -217,6 +216,7 @@
               container.addOrGet(w.getField());
             }
             container.buildSchema(rbd.getContainer().getSchema().getSelectionVectorMode());
+            container.setEmpty();
             initialized = true;
           }
           if (innerRecordCount > 0) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 6d4e057..1cfc61d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -32,6 +32,7 @@
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -124,9 +125,15 @@
    * Transfer vectors from containerIn to this.
    */
   public void transferIn(VectorContainer containerIn) {
-    Preconditions.checkArgument(this.wrappers.size() == containerIn.wrappers.size());
-    for (int i = 0; i < this.wrappers.size(); ++i) {
-      containerIn.wrappers.get(i).transfer(this.wrappers.get(i));
+    rawTransferIn(containerIn);
+    setRecordCount(containerIn.getRecordCount());
+  }
+
+  @VisibleForTesting
+  public void rawTransferIn(VectorContainer containerIn) {
+    Preconditions.checkArgument(wrappers.size() == containerIn.wrappers.size());
+    for (int i = 0; i < wrappers.size(); ++i) {
+      containerIn.wrappers.get(i).transfer(wrappers.get(i));
     }
   }
 
@@ -237,14 +244,14 @@
     * @param srcIndex The index of the row to copy from the source {@link VectorContainer}.
     * @return Position one above where the row was appended
     */
-    public int appendRow(VectorContainer srcContainer, int srcIndex) {
-      for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
-        ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
-        ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector();
-        destVector.copyEntry(recordCount, srcVector, srcIndex);
-      }
-      return incRecordCount();
+  public int appendRow(VectorContainer srcContainer, int srcIndex) {
+    for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
+      ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+      ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector();
+      destVector.copyEntry(recordCount, srcVector, srcIndex);
     }
+    return incRecordCount();
+  }
 
   public TypedFieldId add(ValueVector vv) {
     schemaChanged();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 9e31ff8..577517d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -31,10 +31,10 @@
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
- * A specialized version of record batch that can moves out buffers and preps them for writing.
+ * A specialized version of record batch that can moves out buffers and preps
+ * them for writing.
  */
 public class WritableBatch implements AutoCloseable {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WritableBatch.class);
 
   private final RecordBatchDef def;
   private final DrillBuf[] buffers;
@@ -74,7 +74,8 @@
   public void reconstructContainer(BufferAllocator allocator, VectorContainer container) {
     Preconditions.checkState(!cleared,
         "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
-    if (buffers.length > 0) { /* If we have DrillBuf's associated with value vectors */
+    // If we have DrillBuf's associated with value vectors
+    if (buffers.length > 0) {
       int len = 0;
       for (DrillBuf b : buffers) {
         len += b.capacity();
@@ -82,7 +83,7 @@
 
       DrillBuf newBuf = allocator.buffer(len);
       try {
-        /* Copy data from each buffer into the compound buffer */
+        // Copy data from each buffer into the compound buffer
         int offset = 0;
         for (DrillBuf buf : buffers) {
           newBuf.setBytes(offset, buf);
@@ -94,9 +95,9 @@
 
         int bufferOffset = 0;
 
-        /*
-         * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
-         */
+        // For each value vector slice up the appropriate size from the
+        // compound buffer and load it into the value vector
+
         int vectorIndex = 0;
 
         for (VectorWrapper<?> vv : container) {
@@ -120,16 +121,11 @@
       svMode = SelectionVectorMode.NONE;
     }
     container.buildSchema(svMode);
-
-    /* Set the record count in the value vector */
-    for (VectorWrapper<?> v : container) {
-      ValueVector.Mutator m = v.getValueVector().getMutator();
-      m.setValueCount(def.getRecordCount());
-    }
+    container.setValueCount(def.getRecordCount());
   }
 
   public void clear() {
-    if(cleared) {
+    if (cleared) {
       return;
     }
     for (DrillBuf buf : buffers) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 69500c0..99b0723 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -43,6 +43,8 @@
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Encapsulates the future management of query submissions.  This entails a
@@ -56,8 +58,8 @@
  * handle this case and then do a switcheroo.
  */
 public class QueryResultHandler {
-  private static final org.slf4j.Logger logger =
-      org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(QueryResultHandler.class);
 
   /**
    * Current listener for results, for each active query.
@@ -74,7 +76,7 @@
   }
 
   public RpcConnectionHandler<UserToBitConnection> getWrappedConnectionHandler(
-      final RpcConnectionHandler<UserToBitConnection> handler) {
+      RpcConnectionHandler<UserToBitConnection> handler) {
     return new ChannelClosedHandler(handler);
   }
 
@@ -82,11 +84,11 @@
    * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
    * handles data result messages
    */
-  public void resultArrived( ByteBuf pBody ) throws RpcException {
-    final QueryResult queryResult = RpcBus.get( pBody, QueryResult.PARSER );
+  public void resultArrived(ByteBuf pBody) throws RpcException {
+    QueryResult queryResult = RpcBus.get(pBody, QueryResult.PARSER);
 
-    final QueryId queryId = queryResult.getQueryId();
-    final QueryState queryState = queryResult.getQueryState();
+    QueryId queryId = queryResult.getQueryId();
+    QueryState queryState = queryResult.getQueryState();
 
     if (logger.isDebugEnabled()) {
       logger.debug("resultArrived: queryState: {}, queryId = {}", queryState,
@@ -95,18 +97,18 @@
 
     assert queryResult.hasQueryState() : "received query result without QueryState";
 
-    final boolean isFailureResult = QueryState.FAILED == queryState;
+    boolean isFailureResult = QueryState.FAILED == queryState;
     // CANCELED queries are handled the same way as COMPLETED
-    final boolean isTerminalResult;
-    switch ( queryState ) {
+    boolean isTerminalResult;
+    switch (queryState) {
       case FAILED:
       case CANCELED:
       case COMPLETED:
         isTerminalResult = true;
         break;
       default:
-        logger.error( "Unexpected/unhandled QueryState " + queryState
-          + " (for query " + queryId +  ")" );
+        logger.error("Unexpected/unhandled QueryState " + queryState
+          + " (for query " + queryId +  ")");
         isTerminalResult = false;
         break;
     }
@@ -127,19 +129,19 @@
 
         try {
           resultsListener.queryCompleted(queryState);
-        } catch ( Exception e ) {
+        } catch (Exception e) {
           resultsListener.submissionFailed(UserException.systemError(e).build(logger));
         }
       } else {
         logger.warn("queryState {} was ignored", queryState);
       }
     } finally {
-      if ( isTerminalResult ) {
+      if (isTerminalResult) {
         // TODO:  What exactly are we checking for?  How should we really check
         // for it?
-        if ( (! ( resultsListener instanceof BufferingResultsListener )
-          || ((BufferingResultsListener) resultsListener).output != null ) ) {
-          queryIdToResultsListenersMap.remove( queryId, resultsListener );
+        if ((! (resultsListener instanceof BufferingResultsListener)
+          || ((BufferingResultsListener) resultsListener).output != null)) {
+          queryIdToResultsListenersMap.remove(queryId, resultsListener);
         }
       }
     }
@@ -149,50 +151,52 @@
    * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
    * handles query data messages
    */
-  public void batchArrived( ConnectionThrottle throttle,
-                            ByteBuf pBody, ByteBuf dBody ) throws RpcException {
-    final QueryData queryData = RpcBus.get( pBody, QueryData.PARSER );
+  public void batchArrived(ConnectionThrottle throttle,
+                            ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    QueryData queryData = RpcBus.get(pBody, QueryData.PARSER);
     // Current batch coming in.
-    final DrillBuf drillBuf = (DrillBuf) dBody;
-    final QueryDataBatch batch = new QueryDataBatch( queryData, drillBuf );
+    DrillBuf drillBuf = (DrillBuf) dBody;
+    QueryDataBatch batch = new QueryDataBatch(queryData, drillBuf);
 
-    final QueryId queryId = queryData.getQueryId();
+    QueryId queryId = queryData.getQueryId();
 
     if (logger.isDebugEnabled()) {
       logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId));
     }
-    logger.trace( "batchArrived: batch = {}", batch );
+    logger.trace("batchArrived: batch = {}", batch);
 
-    final UserResultsListener resultsListener = newUserResultsListener(queryId);
+    UserResultsListener resultsListener = newUserResultsListener(queryId);
 
     // A data case--pass on via dataArrived
     try {
       resultsListener.dataArrived(batch, throttle);
       // That releases batch if successful.
-    } catch ( Exception e ) {
+    } catch (Exception e) {
       batch.release();
       resultsListener.submissionFailed(UserException.systemError(e).build(logger));
     }
   }
 
   /**
-   * Return {@link UserResultsListener} associated with queryId. Will create a new {@link BufferingResultsListener}
-   * if no listener found.
-   * @param queryId queryId we are getting the listener for
+   * Return {@link UserResultsListener} associated with queryId. Will create a
+   * new {@link BufferingResultsListener} if no listener found.
+   *
+   * @param queryId
+   *          queryId we are getting the listener for
    * @return {@link UserResultsListener} associated with queryId
    */
   private UserResultsListener newUserResultsListener(QueryId queryId) {
-    UserResultsListener resultsListener = queryIdToResultsListenersMap.get( queryId );
-    logger.trace( "For QueryId [{}], retrieved results listener {}", queryId, resultsListener );
-    if ( null == resultsListener ) {
+    UserResultsListener resultsListener = queryIdToResultsListenersMap.get(queryId);
+    logger.trace("For QueryId [{}], retrieved results listener {}", queryId, resultsListener);
+    if (null == resultsListener) {
       // WHO?? didn't get query ID response and set submission listener yet,
       // so install a buffering listener for now
 
       BufferingResultsListener bl = new BufferingResultsListener();
-      resultsListener = queryIdToResultsListenersMap.putIfAbsent( queryId, bl );
+      resultsListener = queryIdToResultsListenersMap.putIfAbsent(queryId, bl);
       // If we had a successful insertion, use that reference.  Otherwise, just
       // throw away the new buffering listener.
-      if ( null == resultsListener ) {
+      if (null == resultsListener) {
         resultsListener = bl;
       }
     }
@@ -201,7 +205,7 @@
 
   private static class BufferingResultsListener implements UserResultsListener {
 
-    private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
+    private final ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
     private volatile UserException ex;
     private volatile QueryState queryState;
     private volatile UserResultsListener output;
@@ -252,8 +256,10 @@
     @Override
     public void submissionFailed(UserException ex) {
       assert queryState == null;
-      // there is one case when submissionFailed() is called even though the query didn't fail on the server side
-      // it happens when UserResultsListener.batchArrived() throws an exception that will be passed to
+      // there is one case when submissionFailed() is called even though the
+      // query didn't fail on the server side
+      // it happens when UserResultsListener.batchArrived() throws an exception
+      // that will be passed to
       // submissionFailed() by QueryResultHandler.dataArrived()
       queryState = QueryState.FAILED;
       synchronized (this) {
@@ -335,7 +341,7 @@
     }
 
     @Override
-    public void interrupted(final InterruptedException ex) {
+    public void interrupted(InterruptedException ex) {
       if (!isTerminal.compareAndSet(false, true)) {
         logger.warn("Received multiple responses to run query request.");
         return;
@@ -350,26 +356,27 @@
   }
 
   /**
-   * When a {@link UserToBitConnection connection} to a server is successfully created, this handler adds a
-   * listener to that connection that listens to connection closure. If the connection is closed, all active
+   * When a {@link UserToBitConnection connection} to a server is successfully
+   * created, this handler adds a listener to that connection that listens to
+   * connection closure. If the connection is closed, all active
    * {@link UserResultsListener result listeners} are failed.
    */
   private class ChannelClosedHandler implements RpcConnectionHandler<UserToBitConnection> {
 
     private final RpcConnectionHandler<UserToBitConnection> parentHandler;
 
-    public ChannelClosedHandler(final RpcConnectionHandler<UserToBitConnection> parentHandler) {
+    public ChannelClosedHandler(RpcConnectionHandler<UserToBitConnection> parentHandler) {
       this.parentHandler = parentHandler;
     }
 
     @Override
-    public void connectionSucceeded(final UserToBitConnection connection) {
+    public void connectionSucceeded(UserToBitConnection connection) {
       connection.getChannel().closeFuture().addListener(
           new GenericFutureListener<Future<? super Void>>() {
             @Override
             public void operationComplete(Future<? super Void> future)
                 throws Exception {
-              for (final UserResultsListener listener : queryIdToResultsListenersMap.values()) {
+              for (UserResultsListener listener : queryIdToResultsListenersMap.values()) {
                 listener.submissionFailed(UserException.connectionError()
                     .message("Connection %s closed unexpectedly. Drillbit down?",
                         connection.getName())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java b/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
index 865aae4..223fecf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
@@ -26,7 +26,6 @@
 
 @Category(OperatorTest.class)
 public class TestAggNullable extends BaseTestQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestAggNullable.class);
 
   private static void enableAggr(boolean ha, boolean sa) throws Exception {
 
@@ -72,5 +71,4 @@
     assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
         expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 3a02544..ddb1e5b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -37,6 +37,7 @@
 
 @Category({SqlFunctionTest.class, OperatorTest.class, PlannerTest.class, UnlikelyTest.class})
 public class TestExampleQueries extends BaseTestQuery {
+
   @BeforeClass
   public static void setupTestFiles() {
     dirTestWatcher.copyResourceToRoot(Paths.get("tpchmulti"));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
index d3cf0d8..f79eb60 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
@@ -36,7 +36,6 @@
 
 @Category({SqlTest.class, PlannerTest.class})
 public class TestStarQueries extends BaseTestQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStarQueries.class);
 
   @BeforeClass
   public static void setupTestFiles() {
@@ -298,7 +297,7 @@
     try {
       test("select x.n_nationkey, x.n_name, x.n_regionkey, x.r_name from (select * from cp.`tpch/nation.parquet` n, cp.`tpch/region.parquet` r where n.n_regionkey = r.r_regionkey) x " );
     } catch (UserException e) {
-      logger.info("***** Test resulted in expected failure: " + e.getMessage());
+      // Expected
       throw e;
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 3523bdf..292602f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -347,7 +347,6 @@
         .baselineColumns("n_nationkey", "n_regionkey", "n_name")
         .build().run();
 
-
     testBuilder()
         .sqlQuery(query2)
         .unOrdered()
@@ -355,7 +354,7 @@
         .baselineTypes(TypeProtos.MinorType.INT, TypeProtos.MinorType.INT, TypeProtos.MinorType.VARCHAR)
         .baselineColumns("n_nationkey", "n_regionkey", "n_name")
         .build().run();
-    }
+  }
 
   @Test // see DRILL-1977, DRILL-2376, DRILL-2377, DRILL-2378, DRILL-2379
   @Category(UnlikelyTest.class)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
index 0ffd04c..a4d10b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
@@ -51,6 +51,7 @@
  */
 @Category({SlowTest.class, PlannerTest.class})
 public class DrillSeparatePlanningTest extends ClusterTest {
+
   @BeforeClass
   public static void setupFiles() {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "json"));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
index de8a77e..fd43fa1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
@@ -39,54 +39,67 @@
   public void test2() throws Exception {
     test("alter session set `planner.enable_hashjoin` = false");
     test("alter session set `planner.slice_target` = 1");
-    test("SELECT r.r_name, \n" +
-            "       t1.f \n" +
-            "FROM   cp.`tpch/region.parquet` r \n" +
-            "       JOIN (SELECT Flatten(x) AS f \n" +
-            "             FROM   (SELECT Convert_from('[0, 1]', 'json') AS x \n" +
-            "                     FROM   cp.`tpch/region.parquet`)) t1 \n" +
-            "         ON t1.f = cast(r.r_regionkey as bigint) \n" +
-            "ORDER  BY r.r_name");
-    test("alter session set `planner.enable_hashjoin` = true");
-    test("alter session set `planner.slice_target` = 1000000");
+    try {
+      test("SELECT r.r_name, \n" +
+              "       t1.f \n" +
+              "FROM   cp.`tpch/region.parquet` r \n" +
+              "       JOIN (SELECT Flatten(x) AS f \n" +
+              "             FROM   (SELECT Convert_from('[0, 1]', 'json') AS x \n" +
+              "                     FROM   cp.`tpch/region.parquet`)) t1 \n" +
+              "         ON t1.f = cast(r.r_regionkey as bigint) \n" +
+              "ORDER  BY r.r_name");
+    } finally {
+      test("alter session reset `planner.enable_hashjoin`");
+      test("alter session reset `planner.slice_target`");
+    }
   }
 
   @Test
   public void test3() throws Exception {
     test("alter session set `planner.enable_hashjoin` = false");
     test("alter session set `planner.slice_target` = 1");
-    test("select f from\n" +
-            "(select convert_from(nation, 'json') as f from\n" +
-            "(select concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', r.r_regionkey, '}') as nation\n" +
-            "       from cp.`tpch/nation.parquet` n,\n" +
-            "            cp.`tpch/region.parquet` r\n" +
-            "        where \n" +
-            "        n.n_regionkey = r.r_regionkey)) t\n" +
-            "order by t.f.name");
+    try {
+      test("select f from\n" +
+              "(select convert_from(nation, 'json') as f from\n" +
+              "(select concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', r.r_regionkey, '}') as nation\n" +
+              "       from cp.`tpch/nation.parquet` n,\n" +
+              "            cp.`tpch/region.parquet` r\n" +
+              "        where \n" +
+              "        n.n_regionkey = r.r_regionkey)) t\n" +
+              "order by t.f.name");
+    } finally {
+      test("alter session reset `planner.enable_hashjoin`");
+      test("alter session reset `planner.slice_target`");
+    }
   }
 
   @Test
   public void test4() throws Exception {
     test("alter session set `planner.enable_hashjoin` = false");
     test("alter session set `planner.slice_target` = 1");
-    test("SELECT f \n" +
-            "FROM   (SELECT Convert_from(nation, 'json') AS f \n" +
-            "        FROM   (SELECT Concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', \n" +
-            "                       r.r_regionkey, \n" +
-            "                               '}') AS \n" +
-            "                       nation \n" +
-            "                FROM   cp.`tpch/nation.parquet` n, \n" +
-            "                       cp.`tpch/region.parquet` r \n" +
-            "                WHERE  n.n_regionkey = r.r_regionkey \n" +
-            "                       AND r.r_regionkey = 4)) t \n" +
-            "ORDER  BY t.f.name");
+    try {
+      test("SELECT f \n" +
+              "FROM   (SELECT Convert_from(nation, 'json') AS f \n" +
+              "        FROM   (SELECT Concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', \n" +
+              "                       r.r_regionkey, \n" +
+              "                               '}') AS \n" +
+              "                       nation \n" +
+              "                FROM   cp.`tpch/nation.parquet` n, \n" +
+              "                       cp.`tpch/region.parquet` r \n" +
+              "                WHERE  n.n_regionkey = r.r_regionkey \n" +
+              "                       AND r.r_regionkey = 4)) t \n" +
+              "ORDER  BY t.f.name");
+    } finally {
+      test("alter session reset `planner.enable_hashjoin`");
+      test("alter session reset `planner.slice_target`");
+    }
   }
 
   @Test //DRILL-4783 when resultset is empty, don't throw exception.
   @Category(UnlikelyTest.class)
   public void test5() throws Exception {
 
-    //when there is no incoming record, flatten won't throw exception
+    // when there is no incoming record, flatten won't throw exception
     testBuilder().sqlQuery("select flatten(j) from \n" +
            " (select convert_from(names, 'json') j \n" +
            " from (select concat('[\"', first_name, '\", ', '\"', last_name, '\"]') names \n" +
@@ -94,7 +107,7 @@
         .expectsEmptyResultSet()
         .build().run();
 
-    //result is not empty and is list type,
+    // result is not empty and is list type,
     testBuilder().sqlQuery("select flatten(j) n from \n" +
         " (select convert_from(names, 'json') j \n" +
         " from (select concat('[\"', first_name, '\", ', '\"', last_name, '\"]') names \n" +
@@ -105,7 +118,7 @@
         .baselineValues("Nowmer")
         .build().run();
 
-    //result is not empty, and flatten got incompatible (non-list) incoming records. got exception thrown
+    // result is not empty, and flatten got incompatible (non-list) incoming records. got exception thrown
     errorMsgTestHelper("select flatten(first_name) from \n" +
         "(select first_name from cp.`employee.json` where first_name='Sheri')",
         "Flatten does not support inputs of non-list values");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 51e47ba..2d1a1ba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -77,9 +77,6 @@
     this.allocator = context.getAllocator();
     this.container = new VectorContainer(allocator, schema);
     this.allOutcomes = iterOutcomes;
-    this.currentContainerIndex = 0;
-    this.currentOutcomeIndex = 0;
-    this.isDone = false;
   }
 
   @Deprecated
@@ -193,7 +190,7 @@
   @Override
   public IterOutcome next() {
 
-    if(isDone) {
+    if (isDone) {
       return IterOutcome.NONE;
     }
 
@@ -213,16 +210,18 @@
       switch (rowSet.indirectionType()) {
         case NONE:
         case TWO_BYTE:
-          container.transferIn(input);
-          if ( input.hasRecordCount() ) { // in case special test of uninitialized input container
-            container.setRecordCount(input.getRecordCount());
+          if (input.hasRecordCount()) { // in case special test of uninitialized input container
+            container.transferIn(input);
+          } else {
+            // Not normally a valid condition, supported here just for testing
+            container.rawTransferIn(input);
           }
           SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
 
           if (sv2 != null) {
             // Operators assume that new values for an Sv2 are transferred in.
             sv2.allocateNewSafe(inputSv2.getCount());
-            for (int i=0; i<inputSv2.getCount(); ++i) {
+            for (int i=0; i < inputSv2.getCount(); ++i) {
               sv2.setIndex(i, inputSv2.getIndex(i));
             }
             sv2.setRecordCount(inputSv2.getCount());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index 376ac03..87a324f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -57,8 +57,6 @@
 
   /**
    *  A template for Hash Aggr spilling tests
-   *
-   * @throws Exception
    */
   private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback, boolean predict,
                          String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception {
@@ -84,8 +82,6 @@
   /**
    * Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
    * ("normal spill" means spill-cycle = 1 )
-   *
-   * @throws Exception
    */
   @Test
   public void testSimpleHashAggrSpill() throws Exception {
@@ -96,8 +92,6 @@
   /**
    * Test with "needed memory" prediction turned off
    * (i.e., exercise code paths that catch OOMs from the Hash Table and recover)
-   *
-   * @throws Exception
    */
   @Test
   @Ignore("DRILL-7301")
@@ -126,9 +120,8 @@
   }
 
   /**
-   * Test Secondary and Tertiary spill cycles - Happens when some of the spilled partitions cause more spilling as they are read back
-   *
-   * @throws Exception
+   * Test Secondary and Tertiary spill cycles - Happens when some of the spilled
+   * partitions cause more spilling as they are read back
    */
   @Test
   public void testHashAggrSecondaryTertiarySpill() throws Exception {
@@ -139,9 +132,8 @@
   }
 
   /**
-   * Test with the "fallback" option disabled: When not enough memory available to allow spilling, then fail (Resource error) !!
-   *
-   * @throws Exception
+   * Test with the "fallback" option disabled: When not enough memory available
+   * to allow spilling, then fail (Resource error) !!
    */
   @Test
   public void testHashAggrFailWithFallbackDisabed() throws Exception {
@@ -158,10 +150,10 @@
   }
 
   /**
-   * Test with the "fallback" option ON: When not enough memory is available to allow spilling (internally need enough memory to
-   * create multiple partitions), then behave like the pre-1.11 Hash Aggregate: Allocate unlimited memory, no spill.
-   *
-   * @throws Exception
+   * Test with the "fallback" option ON: When not enough memory is available to
+   * allow spilling (internally need enough memory to create multiple
+   * partitions), then behave like the pre-1.11 Hash Aggregate: Allocate
+   * unlimited memory, no spill.
    */
   @Test
   public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 7610e71..d2485f1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -276,7 +276,7 @@
   @Test
   @Category(UnlikelyTest.class)
   public void drill1652() throws Exception {
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test("select uid, flatten(transactions) from dfs.`tmp/bigfile.json`");
     }
   }
@@ -330,7 +330,7 @@
   public void testKVGenFlatten2() throws Exception {
     // currently runs
     // TODO - re-verify results by hand
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test("select flatten(kvgen(visited_cellid_counts)) as mytb from dfs.`tmp/mapkv.json`");
     }
   }
@@ -351,7 +351,7 @@
 
     // FIXED BY RETURNING PROPER SCHEMA DURING FAST SCHEMA STEP
     // these types of problems are being solved more generally as we develp better support for chaning schema
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test("select celltbl.catl from (\n" +
           "        select flatten(categories) catl from dfs.`tmp/yelp_academic_dataset_business.json` b limit 100\n" +
           "    )  celltbl where celltbl.catl = 'Doctors'");
@@ -360,7 +360,7 @@
 
   @Test
   public void countAggFlattened() throws Exception {
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test("select celltbl.catl, count(celltbl.catl) from ( " +
           "select business_id, flatten(categories) catl from dfs.`tmp/yelp_academic_dataset_business.json` b limit 100 " +
           ")  celltbl group by celltbl.catl limit 10 ");
@@ -369,35 +369,33 @@
 
   @Test
   public void flattenAndAdditionalColumn() throws Exception {
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test("select business_id, flatten(categories) from dfs.`tmp/yelp_academic_dataset_business.json` b");
     }
   }
 
   @Test
   public void testFailingFlattenAlone() throws Exception {
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test("select flatten(categories) from dfs.`tmp/yelp_academic_dataset_business.json` b  ");
     }
   }
 
   @Test
   public void testDistinctAggrFlattened() throws Exception {
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test(" select distinct(celltbl.catl) from (\n" +
           "        select flatten(categories) catl from dfs.`tmp/yelp_academic_dataset_business.json` b\n" +
           "    )  celltbl");
     }
-
   }
 
   @Test
   @Category(UnlikelyTest.class)
   public void testDrill1665() throws Exception {
-    if(RUN_ADVANCED_TESTS){
+    if (RUN_ADVANCED_TESTS) {
       test("select id, flatten(evnts) as rpt from dfs.`tmp/drill1665.json`");
     }
-
   }
 
   @Test
@@ -479,7 +477,6 @@
         .unOrdered()
         .jsonBaselineFile("flatten/drill-2106-result.json")
         .go();
-
   }
 
   @Test // see DRILL-2146
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index a015ff7..5530176 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -36,6 +36,7 @@
 import org.junit.experimental.categories.Category;
 
 public class TestWindowFrame extends BaseTestQuery {
+
   @BeforeClass
   public static void setupMSortBatchSize() throws IOException {
     // make sure memory sorter outputs 20 rows per batch
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
index f828cf7..01110c4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.drill.PlanTestBase;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -69,7 +68,8 @@
           .baselineValues("`sales_district_id`", 110.0, 110.0, 23L, 8.0)
           .go();
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
@@ -96,7 +96,8 @@
           .baselineValues("`birth_date`", 1155.0, 1155.0, 52L, 10.0)
           .go();
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
@@ -124,8 +125,9 @@
               .baselineValues("`birth_date`", 1138.0, 1138.0, 38L, 10.001597699312988)
               .go();
     } finally {
-      test("ALTER SESSION SET `exec.statistics.deterministic_sampling` = false");
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("exec.statistics.deterministic_sampling");
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
@@ -143,7 +145,9 @@
       test("ALTER SESSION SET `planner.statistics.use` = true");
       test("SELECT * FROM dfs.tmp.`lineitem` l JOIN dfs.tmp.`orders` o ON l.l_orderkey = o.o_orderkey");
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
+      resetSessionOption("planner.statistics.use");
     }
   }
 
@@ -166,7 +170,8 @@
       verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.employee_basic4 COMPUTE STATISTICS",
           "16");
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
@@ -204,7 +209,8 @@
           .baselineValues("`dir1`", 120.0, 120.0, 4L, 2.0)
           .go();
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
@@ -214,21 +220,26 @@
     final String tmpLocation = "/multilevel/parquet";
     test("ALTER SESSION SET `planner.slice_target` = 1");
     test("ALTER SESSION SET `store.format` = 'parquet'");
-    test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
-         "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
-    verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
-    verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS",
-        "Table parquetStale has not changed since last ANALYZE!");
-    // Verify we recompute statistics once a new file/directory is added. Update the directory some
-    // time after ANALYZE so that the timestamps are different.
-    Thread.sleep(1000);
-    final String Q4 = "/multilevel/parquet/1996/Q4";
-    test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
-         "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
-    verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
-    Thread.sleep(1000);
-    test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
-    verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+    try {
+      test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+           "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
+      verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+      verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS",
+          "Table parquetStale has not changed since last ANALYZE!");
+      // Verify we recompute statistics once a new file/directory is added. Update the directory some
+      // time after ANALYZE so that the timestamps are different.
+      Thread.sleep(1000);
+      final String Q4 = "/multilevel/parquet/1996/Q4";
+      test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+           "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
+      verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+      Thread.sleep(1000);
+      test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
+      verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+    } finally {
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
+    }
   }
 
   @Test
@@ -236,99 +247,104 @@
     //Test ndv/rowcount for scan
     test("ALTER SESSION SET `planner.slice_target` = 1");
     test("ALTER SESSION SET `store.format` = 'parquet'");
-    test("CREATE TABLE dfs.tmp.employeeUseStat AS SELECT * from cp.`employee.json`");
-    test("CREATE TABLE dfs.tmp.departmentUseStat AS SELECT * from cp.`department.json`");
-    test("ANALYZE TABLE dfs.tmp.employeeUseStat COMPUTE STATISTICS");
-    test("ANALYZE TABLE dfs.tmp.departmentUseStat COMPUTE STATISTICS");
-    test("ALTER SESSION SET `planner.statistics.use` = true");
-    String query = " select employee_id from dfs.tmp.employeeUseStat where department_id = 2";
-    String[] expectedPlan1 = {"Filter\\(condition.*\\).*rowcount = 96.25,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
+    try {
+      test("CREATE TABLE dfs.tmp.employeeUseStat AS SELECT * from cp.`employee.json`");
+      test("CREATE TABLE dfs.tmp.departmentUseStat AS SELECT * from cp.`department.json`");
+      test("ANALYZE TABLE dfs.tmp.employeeUseStat COMPUTE STATISTICS");
+      test("ANALYZE TABLE dfs.tmp.departmentUseStat COMPUTE STATISTICS");
+      test("ALTER SESSION SET `planner.statistics.use` = true");
+      String query = " select employee_id from dfs.tmp.employeeUseStat where department_id = 2";
+      String[] expectedPlan1 = {"Filter\\(condition.*\\).*rowcount = 96.25,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
 
-    query = " select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5)";
-    String[] expectedPlan2 = {"Filter\\(condition.*\\).*rowcount = 192.5,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
+      query = " select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5)";
+      String[] expectedPlan2 = {"Filter\\(condition.*\\).*rowcount = 192.5,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
 
-    query = "select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5) and employee_id = 5";
-    String[] expectedPlan3 = {"Filter\\(condition.*\\).*rowcount = 1.0,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan3, new String[]{});
+      query = "select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5) and employee_id = 5";
+      String[] expectedPlan3 = {"Filter\\(condition.*\\).*rowcount = 1.0,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan3, new String[]{});
 
-    query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
-        + " on emp.department_id = dept.department_id";
-    String[] expectedPlan4 = {"HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
-            "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan4, new String[]{});
+      query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+          + " on emp.department_id = dept.department_id";
+      String[] expectedPlan4 = {"HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+              "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan4, new String[]{});
 
-    query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
-            + " on emp.department_id = dept.department_id where dept.department_id = 5";
-    String[] expectedPlan5 = {"HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
-            "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan5, new String[]{});
+      query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+              + " on emp.department_id = dept.department_id where dept.department_id = 5";
+      String[] expectedPlan5 = {"HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+              "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan5, new String[]{});
 
-    query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
-            + " on emp.department_id = dept.department_id"
-            + " where dept.department_id = 5 and emp.employee_id = 10";
-    String[] expectedPlan6 = {"MergeJoin\\(condition.*\\).*rowcount = 1.0,.*",
-            "Filter\\(condition=\\[AND\\(=\\(\\$1, 10\\), =\\(\\$0, 5\\)\\)\\]\\).*rowcount = 1.0,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
-            "Filter\\(condition=\\[=\\(\\$0, 5\\)\\]\\).*rowcount = 1.0,.*",
-            "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan6, new String[]{});
+      query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+              + " on emp.department_id = dept.department_id"
+              + " where dept.department_id = 5 and emp.employee_id = 10";
+      String[] expectedPlan6 = {"MergeJoin\\(condition.*\\).*rowcount = 1.0,.*",
+              "Filter\\(condition=\\[AND\\(=\\(\\$1, 10\\), =\\(\\$0, 5\\)\\)\\]\\).*rowcount = 1.0,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+              "Filter\\(condition=\\[=\\(\\$0, 5\\)\\]\\).*rowcount = 1.0,.*",
+              "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan6, new String[]{});
 
-    query = " select emp.employee_id, count(*)"
-            + " from dfs.tmp.employeeUseStat emp"
-            + " group by emp.employee_id";
-    String[] expectedPlan7 = {"HashAgg\\(group=\\[\\{0\\}\\], EXPR\\$1=\\[COUNT\\(\\)\\]\\).*rowcount = 1155.0,.*",
-            "Scan.*columns=\\[`employee_id`\\].*rowcount = 1155.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan7, new String[]{});
+      query = " select emp.employee_id, count(*)"
+              + " from dfs.tmp.employeeUseStat emp"
+              + " group by emp.employee_id";
+      String[] expectedPlan7 = {"HashAgg\\(group=\\[\\{0\\}\\], EXPR\\$1=\\[COUNT\\(\\)\\]\\).*rowcount = 1155.0,.*",
+              "Scan.*columns=\\[`employee_id`\\].*rowcount = 1155.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan7, new String[]{});
 
-    query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
-            + " on emp.department_id = dept.department_id "
-            + " group by emp.employee_id";
-    String[] expectedPlan8 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 730.0992454469841,.*",
-            "HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
-            "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan8, new String[]{});
+      query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+              + " on emp.department_id = dept.department_id "
+              + " group by emp.employee_id";
+      String[] expectedPlan8 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 730.0992454469841,.*",
+              "HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+              "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan8, new String[]{});
 
-    query = "select emp.employee_id, dept.department_description"
-            + " from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
-            + " on emp.department_id = dept.department_id "
-            + " group by emp.employee_id, emp.store_id, dept.department_description "
-            + " having dept.department_description = 'FINANCE'";
-    String[] expectedPlan9 = {"HashAgg\\(group=\\[\\{0, 1, 2\\}\\]\\).*rowcount = 60.84160378724867.*",
-            "HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
-            "Filter\\(condition=\\[=\\(\\$1, 'FINANCE'\\)\\]\\).*rowcount = 1.0,.*",
-            "Scan.*columns=\\[`department_id`, `department_description`\\].*rowcount = 12.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan9, new String[]{});
+      query = "select emp.employee_id, dept.department_description"
+              + " from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+              + " on emp.department_id = dept.department_id "
+              + " group by emp.employee_id, emp.store_id, dept.department_description "
+              + " having dept.department_description = 'FINANCE'";
+      String[] expectedPlan9 = {"HashAgg\\(group=\\[\\{0, 1, 2\\}\\]\\).*rowcount = 60.84160378724867.*",
+              "HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
+              "Filter\\(condition=\\[=\\(\\$1, 'FINANCE'\\)\\]\\).*rowcount = 1.0,.*",
+              "Scan.*columns=\\[`department_id`, `department_description`\\].*rowcount = 12.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan9, new String[]{});
 
-    query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
-            + " on emp.department_id = dept.department_id "
-            + " group by emp.employee_id, emp.store_id "
-            + " having emp.store_id = 7";
-    String[] expectedPlan10 = {"HashAgg\\(group=\\[\\{0, 1\\}\\]\\).*rowcount = 29.203969817879365.*",
-            "HashJoin\\(condition.*\\).*rowcount = 46.2,.*",
-            "Filter\\(condition=\\[=\\(\\$2, 7\\)\\]\\).*rowcount = 46.2,.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
-            "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan10, new String[]{});
+      query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
+              + " on emp.department_id = dept.department_id "
+              + " group by emp.employee_id, emp.store_id "
+              + " having emp.store_id = 7";
+      String[] expectedPlan10 = {"HashAgg\\(group=\\[\\{0, 1\\}\\]\\).*rowcount = 29.203969817879365.*",
+              "HashJoin\\(condition.*\\).*rowcount = 46.2,.*",
+              "Filter\\(condition=\\[=\\(\\$2, 7\\)\\]\\).*rowcount = 46.2,.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
+              "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan10, new String[]{});
 
-    query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
-            + " on emp.department_id = dept.department_id "
-            + " group by emp.employee_id "
-            + " having emp.employee_id = 7";
-    String[] expectedPlan11 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 1.0.*",
-            "HashJoin\\(condition.*\\).*rowcount = 1.0,.*",
-            "Filter\\(condition=\\[=\\(\\$1, 7\\)\\]\\).*rowcount = 1.0.*",
-            "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*",
-            "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan11, new String[]{});
+      query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
+              + " on emp.department_id = dept.department_id "
+              + " group by emp.employee_id "
+              + " having emp.employee_id = 7";
+      String[] expectedPlan11 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 1.0.*",
+              "HashJoin\\(condition.*\\).*rowcount = 1.0,.*",
+              "Filter\\(condition=\\[=\\(\\$1, 7\\)\\]\\).*rowcount = 1.0.*",
+              "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*",
+              "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan11, new String[]{});
+    } finally {
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
+    }
   }
 
   @Test
@@ -337,35 +353,41 @@
     test("ALTER SESSION SET `store.format` = 'parquet'");
     test("ALTER SESSION SET `planner.statistics.use` = true");
     final String tmpLocation = "/multilevel/parquet";
-    // copy the data into the temporary location
-    test("DROP TABLE dfs.tmp.parquetStale");
-    test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
-            "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
-    String query = "select count(distinct o_orderkey) from dfs.tmp.parquetStale";
-    verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
-    test("REFRESH TABLE METADATA dfs.tmp.parquetStale");
-    // Verify we recompute statistics once a new file/directory is added. Update the directory some
-    // time after ANALYZE so that the timestamps are different.
-    Thread.sleep(1000);
-    final String Q4 = "/multilevel/parquet/1996/Q4";
-    test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
-            "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
-    // query should use STALE statistics
-    String[] expectedStalePlan = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
-        "Scan.*rowcount = 130.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedStalePlan, new String[]{});
-    // Query should use Parquet Metadata, since statistics not available. In this case, NDV is computed as
-    // 1/10*rowcount (Calcite default). Hence, NDV is 13.0 instead of the correct 119.0
-    test("DROP TABLE dfs.tmp.`parquetStale/.stats.drill`");
-    String[] expectedPlan1 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 13.0.*",
-        "Scan.*rowcount = 130.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
-    // query should use the new statistics. NDV remains unaffected since we copy the Q4 into Q5
-    verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
-    String[] expectedPlan2 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
-        "Scan.*rowcount = 130.0.*"};
-    PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
-    test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
+    try {
+      // copy the data into the temporary location
+      test("DROP TABLE dfs.tmp.parquetStale");
+      test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+              "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
+      String query = "select count(distinct o_orderkey) from dfs.tmp.parquetStale";
+      verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+      test("REFRESH TABLE METADATA dfs.tmp.parquetStale");
+      // Verify we recompute statistics once a new file/directory is added. Update the directory some
+      // time after ANALYZE so that the timestamps are different.
+      Thread.sleep(1000);
+      final String Q4 = "/multilevel/parquet/1996/Q4";
+      test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+              "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
+      // query should use STALE statistics
+      String[] expectedStalePlan = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
+          "Scan.*rowcount = 130.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedStalePlan, new String[]{});
+      // Query should use Parquet Metadata, since statistics not available. In this case, NDV is computed as
+      // 1/10*rowcount (Calcite default). Hence, NDV is 13.0 instead of the correct 119.0
+      test("DROP TABLE dfs.tmp.`parquetStale/.stats.drill`");
+      String[] expectedPlan1 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 13.0.*",
+          "Scan.*rowcount = 130.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
+      // query should use the new statistics. NDV remains unaffected since we copy the Q4 into Q5
+      verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+      String[] expectedPlan2 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
+          "Scan.*rowcount = 130.0.*"};
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
+      test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
+    } finally {
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
+      resetSessionOption("planner.statistics.use");
+    }
   }
 
   // Test basic histogram creation functionality for int, bigint, double, date, timestamp and boolean data types.
@@ -436,9 +458,10 @@
       String[] expectedPlan6 = {"Filter\\(condition.*\\).*rowcount = 1.0,.*",
         "Scan.*columns=\\[`store_id`\\].*rowcount = 1128.0.*"};
       PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan6, new String[]{});
-
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
+      resetSessionOption("planner.statistics.use");
     }
   }
 
@@ -462,7 +485,8 @@
               .baselineValues("`c_acctbal`", 11)
               .go();
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
@@ -498,11 +522,11 @@
               .go();
 
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
-
   @Test
   public void testHistogramWithIntervalPredicate() throws Exception {
     try {
@@ -516,7 +540,8 @@
       String[] expectedPlan1 = {"Filter\\(condition.*\\).*rowcount = 59?.*,.*", "Scan.*columns=\\[`o_orderdate`\\].*rowcount = 15000.0.*"};
       PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
     } finally {
-      test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+      resetSessionOption("planner.slice_target");
+      resetSessionOption("store.format");
     }
   }
 
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 29cbe39..e32ecc9 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -579,21 +579,21 @@
     /**
      * Fill in missing values up to, but not including, the given
      * index.
-     * 
+     *
      * @param index the index about to be written, or the total
      * vector length about to be set
      */
-    
+
     @VisibleForTesting
     protected void fillEmpties(int index) {
       values.getMutator().fillEmpties(lastSet, index);
       while (index > bits.getValueCapacity()) {
         bits.reAlloc();
       }
-      
+
       // Set last set to the given index; which the caller
       // will write to
-      
+
       lastSet = index;
     }
 
@@ -753,7 +753,7 @@
       values.getMutator().setValueCount(valueCount);
       bits.getMutator().setValueCount(valueCount);
     }
-    
+
     <#if type.major == "VarLen">
     /** Enables this wrapper container class to participate in bulk mutator logic */
     private final class VarLenBulkInputCallbackImpl implements VarLenBulkInput.BulkInputCallback<VarLenBulkEntry> {
@@ -853,7 +853,7 @@
     public void setSetCount(int n) {
       <#if type.major = "VarLen">lastSet = n - 1;</#if>
     }
-    
+
     // For nullable vectors, exchanging buffers (done elsewhere)
     // requires also exchanging mutator state (done here.)
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
index e4e8cbc..099f113 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
@@ -25,7 +25,11 @@
 
   public interface Mutator extends ValueVector.Mutator {
 
-    // Used by the vector accessors to force the last set value.
+    /**
+     * Used by the vector accessors to force the last set value.
+     * @param n the value of the last set field used to
+     * fill empties
+     */
 
     void setSetCount(int n);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
index 564e784..5830b30 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
@@ -34,7 +34,7 @@
     if (capacity == 0) {
       return;
     }
-    int length = Math.min(maxPrint, capacity);
+    int length = Math.max(maxPrint, vector.getAccessor().getValueCount());
     printOffsets(vector, 0, length);
   }