DRILL-7439: Batch count fixes for six additional operators

Enables vector checks, and fixes batch count and vector issues for:

* StreamingAggBatch
* RuntimeFilterRecordBatch
* FlattenRecordBatch
* MergeJoinBatch
* NestedLoopJoinBatch
* LimitRecordBatch

Also fixes a zero-size batch validity issue for the CSV reader when
all files contain no data.

Includes code cleanup for files touched in this PR.

closes #1893
diff --git a/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java b/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
index f4fc9ab..446825a 100644
--- a/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
+++ b/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
@@ -18,11 +18,9 @@
 package org.apache.drill.exec.util;
 
 public class AssertionUtil {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class);
-
   public static final boolean ASSERT_ENABLED;
 
-  static{
+  static {
     boolean isAssertEnabled = false;
     assert isAssertEnabled = true;
     ASSERT_ENABLED = isAssertEnabled;
@@ -32,6 +30,5 @@
     return ASSERT_ENABLED;
   }
 
-  private AssertionUtil() {
-  }
+  private AssertionUtil() { }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 6cec50e..d166353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
+import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -31,7 +34,6 @@
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
-
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -40,7 +42,6 @@
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
-
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -55,40 +56,30 @@
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
-
 import org.apache.drill.exec.physical.impl.common.SpilledState;
-import org.apache.drill.exec.record.RecordBatchSizer;
-
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.planner.physical.AggPrelBase;
-
-import org.apache.drill.exec.record.MaterializedField;
-
-import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
-
-import org.apache.drill.exec.record.VectorContainer;
-
-import org.apache.drill.exec.record.TypedFieldId;
-
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
-
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ObjectVector;
 import org.apache.drill.exec.vector.ValueVector;
-
 import org.apache.drill.exec.vector.VariableWidthVector;
-
-import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
-import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class HashAggTemplate implements HashAggregator {
-  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+  protected static final Logger logger = LoggerFactory.getLogger(HashAggregator.class);
 
   private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50;
   private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
@@ -98,40 +89,40 @@
   private static final boolean EXTRA_DEBUG_SPILL = false;
 
   // Fields needed for partitioning (the groups into partitions)
-  private int nextPartitionToReturn = 0; // which partition to return the next batch from
+  private int nextPartitionToReturn; // which partition to return the next batch from
   // The following members are used for logging, metrics, etc.
-  private int rowsInPartition = 0; // counts #rows in each partition
-  private int rowsNotSpilled = 0;
-  private int rowsSpilled = 0;
-  private int rowsSpilledReturned = 0;
-  private int rowsReturnedEarly = 0;
+  private int rowsInPartition; // counts #rows in each partition
+  private int rowsNotSpilled;
+  private int rowsSpilled;
+  private int rowsSpilledReturned;
+  private int rowsReturnedEarly;
 
   private AggPrelBase.OperatorPhase phase;
   private boolean canSpill = true; // make it false in case can not spill/return-early
   private ChainedHashTable baseHashTable;
-  private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
-  private int earlyPartition = 0; // which partition to return early
-  private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
-  private boolean useMemoryPrediction = false; // whether to use memory prediction to decide when to spill
-  private long estMaxBatchSize = 0; // used for adjusting #partitions and deciding when to spill
-  private long estRowWidth = 0; // the size of the internal "row" (keys + values + extra columns)
-  private long estValuesRowWidth = 0; // the size of the internal values ( values + extra )
-  private long estOutputRowWidth = 0; // the size of the output "row" (no extra columns)
-  private long estValuesBatchSize = 0; // used for "reserving" memory for the Values batch to overcome an OOM
-  private long estOutgoingAllocSize = 0; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
+  private boolean earlyOutput; // when 1st phase returns a partition due to no memory
+  private int earlyPartition; // which partition to return early
+  private boolean retrySameIndex; // in case put failed during 1st phase - need to output early, then retry
+  private boolean useMemoryPrediction; // whether to use memory prediction to decide when to spill
+  private long estMaxBatchSize; // used for adjusting #partitions and deciding when to spill
+  private long estRowWidth; // the size of the internal "row" (keys + values + extra columns)
+  private long estValuesRowWidth; // the size of the internal values ( values + extra )
+  private long estOutputRowWidth; // the size of the output "row" (no extra columns)
+  private long estValuesBatchSize; // used for "reserving" memory for the Values batch to overcome an OOM
+  private long estOutgoingAllocSize; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
   private long reserveValueBatchMemory; // keep "reserve memory" for Values Batch
   private long reserveOutgoingMemory; // keep "reserve memory" for the Outgoing (Values only) output
   private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
   private long minBatchesPerPartition; // for tuning - num partitions and spill decision
-  private long plannedBatches = 0; // account for planned, but not yet allocated batches
+  private long plannedBatches; // account for planned, but not yet allocated batches
 
-  private int underlyingIndex = 0;
-  private int currentIndex = 0;
+  private int underlyingIndex;
+  private int currentIndex;
   private IterOutcome outcome;
-  private int numGroupedRecords = 0;
-  private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount()
+  private int numGroupedRecords;
+  private int currentBatchRecordCount; // Performance: Avoid repeated calls to getRecordCount()
 
-  private int lastBatchOutputCount = 0;
+  private int lastBatchOutputCount;
   private RecordBatch incoming;
   private BatchSchema schema;
   private HashAggBatch outgoing;
@@ -148,7 +139,7 @@
 
   // For handling spilling
   private HashAggUpdater updater;
-  private SpilledState<HashAggSpilledPartition> spilledState = new SpilledState<>();
+  private final SpilledState<HashAggSpilledPartition> spilledState = new SpilledState<>();
   private SpillSet spillSet;
   SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
   private Writer writers[]; // a vector writer for each spilled partition
@@ -157,17 +148,17 @@
   private int originalPartition = -1; // the partition a secondary reads from
 
   private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
-  private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
+  private int numGroupByOutFields; // Note: this should be <= number of group-by fields
   private TypedFieldId[] groupByOutFieldIds;
 
   private MaterializedField[] materializedValueFields;
-  private boolean allFlushed = false;
-  private boolean buildComplete = false;
-  private boolean handlingSpills = false; // True once starting to process spill files
-  private boolean handleEmit = false; // true after receiving an EMIT, till finish handling it
+  private boolean allFlushed;
+  private boolean buildComplete;
+  private boolean handlingSpills; // True once starting to process spill files
+  private boolean handleEmit; // true after receiving an EMIT, till finish handling it
 
-  private OperatorStats stats = null;
-  private HashTableStats htStats = new HashTableStats();
+  private OperatorStats stats;
+  private final HashTableStats htStats = new HashTableStats();
 
   public enum Metric implements MetricDef {
 
@@ -198,9 +189,9 @@
   }
 
   public class BatchHolder {
-    private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
+    private final VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
     private int maxOccupiedIdx = -1;
-    private int targetBatchRowCount = 0;
+    private int targetBatchRowCount;
 
     public int getTargetBatchRowCount() {
       return targetBatchRowCount;
@@ -1009,11 +1000,7 @@
       this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, numOutputRecords);
 
       // set the value count for outgoing batch value vectors
-      for (VectorWrapper<?> v : outgoing) {
-        v.getValueVector().getMutator().setValueCount(numOutputRecords);
-      }
-
-      outContainer.setRecordCount(numOutputRecords);
+      outContainer.setValueCount(numOutputRecords);
       WritableBatch batch = WritableBatch.getBatchNoHVWrap(numOutputRecords, outContainer, false);
       try {
         writers[part].write(batch, null);
@@ -1067,10 +1054,8 @@
     if ( handleEmit && ( batchHolders == null || batchHolders[0].size() == 0 ) ) {
       lastBatchOutputCount = 0; // empty
       allocateOutgoing(0);
-      for (VectorWrapper<?> v : outgoing) {
-        v.getValueVector().getMutator().setValueCount(0);
-      }
-      outgoing.getContainer().setRecordCount(0);
+      outgoing.getContainer().setValueCount(0);
+
       // When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
       this.outcome = IterOutcome.EMIT;
       handleEmit = false; // finish handling EMIT
@@ -1184,9 +1169,7 @@
     this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, numPendingOutput);
 
     // set the value count for outgoing batch value vectors
-    for (VectorWrapper<?> v : outgoing) {
-      v.getValueVector().getMutator().setValueCount(numOutputRecords);
-    }
+    outgoing.getContainer().setValueCount(numOutputRecords);
 
     outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
     RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 471904f..7d02c99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -17,11 +17,15 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -63,24 +67,22 @@
 import org.apache.drill.exec.vector.UntypedNullHolder;
 import org.apache.drill.exec.vector.UntypedNullVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JVar;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(StreamingAggBatch.class);
 
   protected StreamingAggregator aggregator;
   protected final RecordBatch incoming;
   private List<BaseWriter.ComplexWriter> complexWriters;
-  //
+
   // Streaming agg can be in (a) a normal pipeline or (b) it may be in a pipeline that is part of a subquery involving
   // lateral and unnest. In case(a), the aggregator proceeds normally until it sees a group change or a NONE. If a
   // group has changed, the aggregated data is sent downstream and the aggregation continues with the next group. If
@@ -100,33 +102,37 @@
   //   Schema change within a Data Set is not supported.
   //
   //   We will define some states for internal management
-  //
-  private boolean done = false;  // END of all data
+
+  private boolean done;          // END of all data
   private boolean first = true;  // Beginning of new data set. True during the build schema phase. False once the first
                                  // call to inner next is made.
-  private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set
-                                    // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration,
-                                    // we send out an empty batch with EMIT.
+  private boolean sendEmit;      // In the case where we see an OK_NEW_SCHEMA along with the end of a data set
+                                 // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration,
+                                 // we send out an empty batch with EMIT.
   private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next
   private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA
   private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
-  private int recordCount = 0;  // number of records output in the current data set
+  private int recordCount;         // number of records output in the current data set
 
   private BatchSchema incomingSchema;
 
   /*
-   * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause we need to perform special handling when
-   * the incoming batch is empty. In the case of the empty input into the streaming aggregate we need
-   * to return a single batch with one row. For count we need to return 0 and for all other aggregate
-   * functions like sum, avg etc we need to return an explicit row with NULL. Since we correctly allocate the type of
-   * the outgoing vectors (required for count and nullable for other aggregate functions) all we really need to do
-   * is simply set the record count to be 1 in such cases. For nullable vectors we don't need to do anything because
-   * if we don't set anything the output will be NULL, however for required vectors we explicitly zero out the vector
-   * since we don't zero it out while allocating it.
+   * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause
+   * we need to perform special handling when the incoming batch is empty. In
+   * the case of the empty input into the streaming aggregate we need to return
+   * a single batch with one row. For count we need to return 0 and for all
+   * other aggregate functions like sum, avg etc we need to return an explicit
+   * row with NULL. Since we correctly allocate the type of the outgoing vectors
+   * (required for count and nullable for other aggregate functions) all we
+   * really need to do is simply set the record count to be 1 in such cases. For
+   * nullable vectors we don't need to do anything because if we don't set
+   * anything the output will be NULL, however for required vectors we
+   * explicitly zero out the vector since we don't zero it out while allocating
+   * it.
    *
    * We maintain some state to remember that we have done such special handling.
    */
-  private boolean specialBatchSent = false;
+  private boolean specialBatchSent;
   private static final int SPECIAL_BATCH_COUNT = 1;
 
   // TODO: Needs to adapt to batch sizing rather than hardcoded constant value
@@ -156,7 +162,7 @@
 
   @Override
   public VectorContainer getOutgoingContainer() {
-    return this.container;
+    return container;
   }
 
   @Override
@@ -177,17 +183,18 @@
         break;
     }
 
-    this.incomingSchema = incoming.getSchema();
+    incomingSchema = incoming.getSchema();
     if (!createAggregator()) {
       state = BatchState.DONE;
     }
-    for (final VectorWrapper<?> w : container) {
+    for (VectorWrapper<?> w : container) {
       w.getValueVector().allocateNew();
     }
 
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
+    container.setRecordCount(0);
   }
 
   @Override
@@ -207,6 +214,7 @@
       firstBatchForDataSet = true;
       firstBatchForSchema = false;
       recordCount = 0;
+      container.setEmpty();
       specialBatchSent = false;
       return EMIT;
     }
@@ -379,7 +387,7 @@
   private void allocateComplexWriters() {
     // Allocate the complex writers before processing the incoming batch
     if (complexWriters != null) {
-      for (final BaseWriter.ComplexWriter writer : complexWriters) {
+      for (BaseWriter.ComplexWriter writer : complexWriters) {
         writer.allocate();
       }
     }
@@ -393,8 +401,8 @@
    */
   private void constructSpecialBatch() {
     int exprIndex = 0;
-    for (final VectorWrapper<?> vw: container) {
-      final ValueVector vv = vw.getValueVector();
+    for (VectorWrapper<?> vw: container) {
+      ValueVector vv = vw.getValueVector();
       AllocationHelper.allocateNew(vv, SPECIAL_BATCH_COUNT);
       vv.getMutator().setValueCount(SPECIAL_BATCH_COUNT);
       if (vv.getField().getType().getMode() == TypeProtos.DataMode.REQUIRED) {
@@ -442,7 +450,7 @@
     }
   }
 
-  public void addComplexWriter(final BaseWriter.ComplexWriter writer) {
+  public void addComplexWriter(BaseWriter.ComplexWriter writer) {
     complexWriters.add(writer);
   }
 
@@ -460,21 +468,21 @@
     ErrorCollector collector = new ErrorCollectorImpl();
 
     for (int i = 0; i < keyExprs.length; i++) {
-      final NamedExpression ne = popConfig.getKeys().get(i);
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
+      NamedExpression ne = popConfig.getKeys().get(i);
+      LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
       if (expr == null) {
         continue;
       }
       keyExprs[i] = expr;
-      final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
+      MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
                                                                       expr.getMajorType());
-      final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+      ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       keyOutputIds[i] = container.add(vector);
     }
 
     for (int i = 0; i < valueExprs.length; i++) {
-      final NamedExpression ne = popConfig.getExprs().get(i);
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
+      NamedExpression ne = popConfig.getExprs().get(i);
+      LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
       if (expr instanceof IfExpression) {
         throw UserException.unsupportedError(new UnsupportedOperationException("Union type not supported in aggregate functions")).build(logger);
       }
@@ -498,7 +506,7 @@
         container.add(new UntypedNullVector(field, container.getAllocator()));
         valueExprs[i] = expr;
       } else {
-        final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
+        MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
             expr.getMajorType());
         ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
         TypedFieldId id = container.add(vector);
@@ -532,40 +540,41 @@
 
   protected void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
     cg.setMappingSet(IS_SAME_I1);
-    for (final LogicalExpression expr : keyExprs) {
+    for (LogicalExpression expr : keyExprs) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       cg.setMappingSet(IS_SAME_I1);
-      final HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
       cg.setMappingSet(IS_SAME_I2);
-      final HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
 
-      final LogicalExpression fh =
+      LogicalExpression fh =
           FunctionGenerationHelper
           .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
-      final HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+      HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
       cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
     cg.getEvalBlock()._return(JExpr.TRUE);
   }
 
-  private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null); // the internal batch changes each time so we need to redo setup.
+  // the internal batch changes each time so we need to redo setup.
+  private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null);
   private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSamePrev", null, null);
   private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
   private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
 
   protected void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
     cg.setMappingSet(ISA_B1);
-    for (final LogicalExpression expr : keyExprs) {
+    for (LogicalExpression expr : keyExprs) {
       // first, we rewrite the evaluation stack for each side of the comparison.
       cg.setMappingSet(ISA_B1);
-      final HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
       cg.setMappingSet(ISA_B2);
-      final HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
 
-      final LogicalExpression fh =
+      LogicalExpression fh =
           FunctionGenerationHelper
           .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
-      final HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+      HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
       cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
     cg.getEvalBlock()._return(JExpr.TRUE);
@@ -577,7 +586,7 @@
 
   protected void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) {
     cg.setMappingSet(EVAL);
-    for (final LogicalExpression ex : valueExprs) {
+    for (LogicalExpression ex : valueExprs) {
       cg.addExpr(ex);
     }
   }
@@ -601,11 +610,13 @@
     cg.setMappingSet(RECORD_KEYS_PREV);
 
     for (int i = 0; i < keyExprs.length; i++) {
-      // IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same.  This is possible because InternalBatch guarantees this.
+      // IMPORTANT: there is an implicit assertion here that the TypedFieldIds
+      // for the previous batch and the current batch are the same. This is
+      // possible because InternalBatch guarantees this.
       logger.debug("Writing out expr {}", keyExprs[i]);
       cg.rotateBlock();
       cg.setMappingSet(RECORD_KEYS_PREV);
-      final HoldingContainer innerExpression = cg.addExpr(keyExprs[i], ClassGenerator.BlkCreateMode.FALSE);
+      HoldingContainer innerExpression = cg.addExpr(keyExprs[i], ClassGenerator.BlkCreateMode.FALSE);
       cg.setMappingSet(RECORD_KEYS_PREV_OUT);
       cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), ClassGenerator.BlkCreateMode.FALSE);
     }
@@ -655,11 +666,6 @@
   }
 
   @Override
-  public void close() {
-    super.close();
-  }
-
-  @Override
   protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index cc89f23..173ed6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -17,6 +17,11 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -25,32 +30,29 @@
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class StreamingAggTemplate implements StreamingAggregator {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
+  private static final Logger logger = LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
   private int maxOutputRows = ValueVector.MAX_ROW_COUNT;
 
   // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
-  private IterOutcome lastOutcome = null;
+  private IterOutcome lastOutcome;
 
   // First batch after build schema phase
   private boolean first = true;
-  private boolean firstBatchForSchema = false; // true if the current batch came in with an OK_NEW_SCHEMA.
+  private boolean firstBatchForSchema; // true if the current batch came in with an OK_NEW_SCHEMA.
   private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
 
-  private boolean newSchema = false;
+  private boolean newSchema;
 
   // End of all data
-  private boolean done = false;
+  private boolean done;
 
   // index in the incoming (sv4/sv2/vector)
-  private int underlyingIndex = 0;
+  private int underlyingIndex;
   // The indexes below refer to the actual record indexes in input batch
   // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself)
   private int previousIndex = -1;  // the last index that has been processed. Initialized to -1 every time a new
@@ -59,19 +61,18 @@
   /**
    * Number of records added to the current aggregation group.
    */
-  private long addedRecordCount = 0;
+  private long addedRecordCount;
   // There are two outcomes from the aggregator. One is the aggregator's outcome defined in
   // StreamingAggregator.AggOutcome. The other is the outcome from the last call to incoming.next
   private IterOutcome outcome;
   // Number of aggregation groups added into the output batch
-  private int outputCount = 0;
+  private int outputCount;
   private RecordBatch incoming;
   // the Streaming Agg Batch that this aggregator belongs to
   private StreamingAggBatch outgoing;
 
   private OperatorContext context;
 
-
   @Override
   public void setup(OperatorContext context, RecordBatch incoming,
                     StreamingAggBatch outgoing, int outputRowCount) throws SchemaChangeException {
@@ -166,7 +167,6 @@
         }
       }
 
-
       if (newSchema) {
         return AggOutcome.UPDATE_AGGREGATOR;
       }
@@ -350,7 +350,6 @@
         first = false;
       }
     }
-
   }
 
   @Override
@@ -432,11 +431,9 @@
     } else {
       outcomeToReturn = OK;
     }
-    this.outcome = outcomeToReturn;
+    outcome = outcomeToReturn;
 
-    for (VectorWrapper<?> v : outgoing) {
-      v.getValueVector().getMutator().setValueCount(outputCount);
-    }
+    outgoing.getContainer().setValueCount(outputCount);
     return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : AggOutcome.RETURN_OUTCOME;
   }
 
@@ -455,11 +452,9 @@
     } else {
       outcomeToReturn = EMIT;
     }
-    this.outcome = outcomeToReturn;
+    outcome = outcomeToReturn;
 
-    for (VectorWrapper<?> v : outgoing) {
-      v.getValueVector().getMutator().setValueCount(outputCount);
-    }
+    outgoing.getContainer().setValueCount(outputCount);
     return AggOutcome.RETURN_AND_RESET;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index be3c51e..0257abc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -17,12 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.common;
 
-import com.carrotsearch.hppc.IntArrayList;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.cache.VectorSerializer;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -47,16 +55,12 @@
 import org.apache.drill.exec.vector.ObjectVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE;
+import com.carrotsearch.hppc.IntArrayList;
 
 /**
  * <h2>Overview</h2>
@@ -72,14 +76,14 @@
  *  </p>
  */
 public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class);
+  static final Logger logger = LoggerFactory.getLogger(HashPartition.class);
 
   public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$";
 
   private int partitionNum = -1; // the current number of this partition, as used by the operator
 
   private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
-  private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
+  private final int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
 
   public static final MajorType HVtype = MajorType.newBuilder()
     .setMinorType(MinorType.INT /* dataType */ )
@@ -93,7 +97,7 @@
 
   // While build data is incoming - temporarily keep the list of in-memory
   // incoming batches, per each partition (these may be spilled at some point)
-  private List<VectorContainer> tmpBatchesList;
+  private final List<VectorContainer> tmpBatchesList;
   // A batch and HV vector to hold incoming rows - per each partition
   private VectorContainer currentBatch; // The current (newest) batch
   private IntVector currHVVector; // The HV vectors for the currentBatches
@@ -112,21 +116,21 @@
   private int partitionBatchesCount; // count number of batches spilled
   private String spillFile;
 
-  private BufferAllocator allocator;
+  private final BufferAllocator allocator;
   private int recordsPerBatch;
-  private SpillSet spillSet;
+  private final SpillSet spillSet;
   private boolean isSpilled; // is this partition spilled ?
   private boolean processingOuter; // is (inner done spilling and) now the outer is processed?
   private boolean outerBatchAllocNotNeeded; // when the inner is whole in memory
-  private RecordBatch buildBatch;
-  private RecordBatch probeBatch;
-  private int cycleNum;
-  private int numPartitions;
-  private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
+  private final RecordBatch buildBatch;
+  private final RecordBatch probeBatch;
+  private final int cycleNum;
+  private final int numPartitions;
+  private final List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
   private long partitionInMemorySize;
   private long numInMemoryRecords;
-  private boolean updatedRecordsPerBatch = false;
-  private boolean semiJoin;
+  private boolean updatedRecordsPerBatch;
+  private final boolean semiJoin;
 
   public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
                        RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin,
@@ -188,7 +192,7 @@
 
     try {
       while (vci.hasNext()) {
-        VectorWrapper vw = vci.next();
+        VectorWrapper<?> vw = vci.next();
         // If processing a spilled container, skip the last column (HV)
         if ( cycleNum > 0 && ! vci.hasNext() ) { break; }
         ValueVector vv = vw.getValueVector();
@@ -254,9 +258,11 @@
   public void completeAnOuterBatch(boolean toInitialize) {
     completeABatch(toInitialize, true);
   }
+
   public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
     completeABatch(toInitialize, needsSpill);
   }
+
   /**
    *     A current batch is full (or no more rows incoming) - complete processing this batch
    * I.e., add it to its partition's tmp list, if needed - spill that list, and if needed -
@@ -349,15 +355,13 @@
     numInMemoryRecords = 0L;
     inMemoryBatchStats.clear();
 
-    while ( tmpBatchesList.size() > 0 ) {
+    while (tmpBatchesList.size() > 0) {
       VectorContainer vc = tmpBatchesList.remove(0);
 
       int numRecords = vc.getRecordCount();
 
       // set the value count for outgoing batch value vectors
-      for (VectorWrapper<?> v : vc) {
-        v.getValueVector().getMutator().setValueCount(numRecords);
-      }
+      vc.setValueCount(numRecords);
 
       WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false);
       try {
@@ -381,6 +385,7 @@
   public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException {
     return hashTable.probeForKey(recordsProcessed, hashCode);
   }
+
   public Pair<Integer, Boolean> getStartIndex(int probeIndex) {
     /* The current probe record has a key that matches. Get the index
      * of the first row in the build side that matches the current key
@@ -393,16 +398,20 @@
     boolean matchExists = hjHelper.setRecordMatched(compositeIndex);
     return Pair.of(compositeIndex, matchExists);
   }
+
   public int getNextIndex(int compositeIndex) {
     // in case of inner rows with duplicate keys, get the next one
     return hjHelper.getNextIndex(compositeIndex);
   }
+
   public boolean setRecordMatched(int compositeIndex) {
     return hjHelper.setRecordMatched(compositeIndex);
   }
+
   public IntArrayList getNextUnmatchedIndex() {
     return hjHelper.getNextUnmatchedIndex();
   }
+
   //
   // =====================================================================================
   //
@@ -410,9 +419,11 @@
   public int getBuildHashCode(int ind) throws SchemaChangeException {
     return hashTable.getBuildHashCode(ind);
   }
+
   public int getProbeHashCode(int ind) throws SchemaChangeException {
     return hashTable.getProbeHashCode(ind);
   }
+
   public ArrayList<VectorContainer> getContainers() {
     return containers;
   }
@@ -457,6 +468,7 @@
   public int getPartitionBatchesCount() {
     return partitionBatchesCount;
   }
+
   public int getPartitionNum() {
     return partitionNum;
   }
@@ -468,6 +480,7 @@
     closeWriterInternal(false);
     processingOuter = true; // After the spill file was closed
   }
+
   /**
    * If exists - close the writer for this partition
    *
@@ -601,4 +614,4 @@
     return String.format("[hashTable = %s]",
       hashTable == null ? "None": hashTable.makeDebugString());
   }
-} // class HashPartition
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index ac6718c..28de51f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -17,6 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl.filter;
 
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -39,41 +46,39 @@
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.work.filter.BloomFilter;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates
- * in the HashJoinBatch and can be applied by a RuntimeFilter, it will generate a filtered
- * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 which will not affect
- * the Query's performance ,but just do a memory transfer by the later RemovingRecordBatch op.
+ * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch
+ * participates in the HashJoinBatch and can be applied by a RuntimeFilter, it
+ * will generate a filtered SV2, otherwise will generate a same
+ * recordCount-originalRecordCount SV2 which will not affect the Query's
+ * performance ,but just do a memory transfer by the later RemovingRecordBatch
+ * op.
  */
 public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeFilterPOP> {
-  private SelectionVector2 sv2;
+  private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
 
+  private SelectionVector2 sv2;
   private ValueVectorHashHelper.Hash64 hash64;
-  private Map<String, Integer> field2id = new HashMap<>();
+  private final Map<String, Integer> field2id = new HashMap<>();
   private List<String> toFilterFields;
   private List<BloomFilter> bloomFilters;
   private RuntimeFilterWritable current;
   private int originalRecordCount;
-  private long filteredRows = 0l;
-  private long appliedTimes = 0l;
-  private int batchTimes = 0;
-  private boolean waited = false;
-  private boolean enableRFWaiting;
-  private long maxWaitingTime;
-  private long rfIdentifier;
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
+  private long filteredRows;
+  private long appliedTimes;
+  private int batchTimes;
+  private boolean waited;
+  private final boolean enableRFWaiting;
+  private final long maxWaitingTime;
+  private final long rfIdentifier;
 
   public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
-    enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val;
-    maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val;
+    enableRFWaiting = context.getOptions().getBoolean(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY);
+    maxWaitingTime = context.getOptions().getLong(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY);
     this.rfIdentifier = pop.getIdentifier();
   }
 
@@ -107,6 +112,7 @@
       throw new UnsupportedOperationException(e);
     }
     container.transferIn(incoming.getContainer());
+    container.setRecordCount(originalRecordCount);
     updateStats();
     return getFinalOutcome(false);
   }
@@ -258,7 +264,6 @@
       }
     }
 
-
     appliedTimes++;
     sv2.setRecordCount(svIndex);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 7877c6b..cee7625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -43,16 +43,16 @@
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.FlattenPOP;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.ValueVector;
@@ -60,20 +60,23 @@
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.carrotsearch.hppc.IntHashSet;
 import com.sun.codemodel.JExpr;
 
-// TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly
-// as they come in
+// TODO - handle the case where a user tries to flatten a scalar, should
+// just act as a project all of the columns exactly as they come in
+
 public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(FlattenRecordBatch.class);
 
   private Flattener flattener;
   private List<ValueVector> allocationVectors;
   private List<ComplexWriter> complexWriters;
-  private boolean hasRemainder = false;
-  private int remainderIndex = 0;
+  private boolean hasRemainder;
+  private int remainderIndex;
   private int recordCount;
   private final FlattenMemoryManager flattenMemoryManager;
 
@@ -81,7 +84,7 @@
     @Override
     public int getBufferSizeFor(int recordCount) {
       int bufferSize = 0;
-      for(final ValueVector vv : allocationVectors) {
+      for (ValueVector vv : allocationVectors) {
         bufferSize += vv.getBufferSizeFor(recordCount);
       }
       return bufferSize;
@@ -98,7 +101,7 @@
         outputNames.clear();
       }
 
-      // note:  don't clear the internal maps since they have cumulative data..
+      // note:  don't clear the internal maps since they have cumulative data.
     }
   }
 
@@ -129,28 +132,28 @@
       // Get sizing information for the batch.
       setRecordBatchSizer(new RecordBatchSizer(incoming));
 
-      final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
-      final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+      TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
+      MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
 
       // Get column size of flatten column.
       RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(field.getName());
 
       // Average rowWidth of flatten column
-      final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
+      int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
 
       // Average rowWidth excluding the flatten column.
-      final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn;
+      int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn;
 
       // Average rowWidth of single element in the flatten list.
       // subtract the offset vector size from column data size.
-      final int avgRowWidthSingleFlattenEntry =
+      int avgRowWidthSingleFlattenEntry =
         RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - (getOffsetVectorWidth() * columnSize.getValueCount()),
           columnSize.getElementCount());
 
       // Average rowWidth of outgoing batch.
-      final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
+      int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
 
-      final int outputBatchSize = getOutputBatchSize();
+      int outputBatchSize = getOutputBatchSize();
       // Number of rows in outgoing batch
       setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
 
@@ -182,14 +185,12 @@
     return recordCount;
   }
 
-
   @Override
   protected void killIncoming(boolean sendUpstream) {
     super.killIncoming(sendUpstream);
     hasRemainder = false;
   }
 
-
   @Override
   public IterOutcome innerNext() {
     if (hasRemainder) {
@@ -202,14 +203,14 @@
 
   @Override
   public VectorContainer getOutgoingContainer() {
-    return this.container;
+    return container;
   }
 
   private void setFlattenVector() {
-    final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
-    final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
-    final RepeatedValueVector vector;
-    final ValueVector inVV = incoming.getValueAccessorById(
+    TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
+    MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+    RepeatedValueVector vector;
+    ValueVector inVV = incoming.getValueAccessorById(
         field.getValueClass(), typedFieldId.getFieldIds()).getValueVector();
 
     if (! (inVV instanceof RepeatedValueVector)) {
@@ -232,10 +233,7 @@
 
     int incomingRecordCount = incoming.getRecordCount();
 
-    if (!doAlloc(flattenMemoryManager.getOutputRowCount())) {
-      outOfMemory = true;
-      return IterOutcome.OUT_OF_MEMORY;
-    }
+    doAlloc(flattenMemoryManager.getOutputRowCount());
 
     // we call this in setupSchema, but we also need to call it here so we have a reference to the appropriate vector
     // inside of the the flattener for the current batch
@@ -244,18 +242,13 @@
     int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount();
     int outputRecords = childCount == 0 ? 0: flattener.flattenRecords(incomingRecordCount, 0, monitor);
     // TODO - change this to be based on the repeated vector length
+    setValueCount(outputRecords);
     if (outputRecords < childCount) {
-      setValueCount(outputRecords);
       hasRemainder = true;
       remainderIndex = outputRecords;
-      this.recordCount = remainderIndex;
     } else {
-      setValueCount(outputRecords);
       flattener.resetGroupIndex();
-      for(VectorWrapper<?> v: incoming) {
-        v.clear();
-      }
-      this.recordCount = outputRecords;
+      VectorAccessibleUtilities.clear(incoming.getContainer());
     }
     // In case of complex writer expression, vectors would be added to batch run-time.
     // We have to re-build the schema.
@@ -276,25 +269,18 @@
 
     // remainingRecordCount can be much higher than number of rows we will have in outgoing batch.
     // Do memory allocation only for number of rows we are going to have in the batch.
-    if (!doAlloc(Math.min(remainingRecordCount, flattenMemoryManager.getOutputRowCount()))) {
-      outOfMemory = true;
-      return;
-    }
+    doAlloc(Math.min(remainingRecordCount, flattenMemoryManager.getOutputRowCount()));
 
     int projRecords = flattener.flattenRecords(remainingRecordCount, 0, monitor);
     if (projRecords < remainingRecordCount) {
       setValueCount(projRecords);
-      this.recordCount = projRecords;
       remainderIndex += projRecords;
     } else {
       setValueCount(remainingRecordCount);
       hasRemainder = false;
       remainderIndex = 0;
-      for (VectorWrapper<?> v : incoming) {
-        v.clear();
-      }
+      VectorAccessibleUtilities.clear(incoming.getContainer());
       flattener.resetGroupIndex();
-      this.recordCount = remainingRecordCount;
     }
     // In case of complex writer expression, vectors would be added to batch run-time.
     // We have to re-build the schema.
@@ -303,45 +289,38 @@
     }
 
     flattenMemoryManager.updateOutgoingStats(projRecords);
-
   }
 
   public void addComplexWriter(ComplexWriter writer) {
     complexWriters.add(writer);
   }
 
-  private boolean doAlloc(int recordCount) {
-
-    for (ValueVector v : this.allocationVectors) {
+  private void doAlloc(int recordCount) {
+    for (ValueVector v : allocationVectors) {
       // This will iteratively allocate memory for nested columns underneath.
       RecordBatchSizer.ColumnSize colSize = flattenMemoryManager.getColumnSize(v.getField().getName());
       colSize.allocateVector(v, recordCount);
     }
 
-    //Allocate vv for complexWriters.
-    if (complexWriters == null) {
-      return true;
+    // Allocate vv for complexWriters.
+    if (complexWriters != null) {
+      for (ComplexWriter writer : complexWriters) {
+        writer.allocate();
+      }
     }
-
-    for (ComplexWriter writer : complexWriters) {
-      writer.allocate();
-    }
-
-    return true;
   }
 
   private void setValueCount(int count) {
-    for (ValueVector v : allocationVectors) {
-      ValueVector.Mutator m = v.getMutator();
-      m.setValueCount(count);
+    recordCount = count;
+    if (count == 0) {
+      container.setEmpty();
+    } else {
+      container.setValueCount(count);
     }
-
-    if (complexWriters == null) {
-      return;
-    }
-
-    for (ComplexWriter writer : complexWriters) {
-      writer.setValueCount(count);
+    if (complexWriters != null) {
+      for (ComplexWriter writer : complexWriters) {
+        writer.setValueCount(count);
+      }
     }
   }
 
@@ -350,33 +329,40 @@
   }
 
   /**
-   * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for
-   * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to
-   * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the
-   * scalar vector of the same type, we can avoid making individual copies for the column being flattened, and just
-   * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the flatten
-   * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or
-   * the end of one of the other vectors while we are copying the data of the other vectors alongside each new flattened
-   * value coming out of the repeated field.)
+   * The data layout is the same for the actual data within a repeated field, as
+   * it is in a scalar vector for the same sql type. For example, a repeated int
+   * vector has a vector of offsets into a regular int vector to represent the
+   * lists. As the data layout for the actual values in the same in the repeated
+   * vector as in the scalar vector of the same type, we can avoid making
+   * individual copies for the column being flattened, and just use vector
+   * copies between the inner vector of the repeated field to the resulting
+   * scalar vector from the flatten operation. This is completed after we
+   * determine how many records will fit (as we will hit either a batch end, or
+   * the end of one of the other vectors while we are copying the data of the
+   * other vectors alongside each new flattened value coming out of the repeated
+   * field.)
    */
   private TransferPair getFlattenFieldTransferPair(FieldReference reference) {
-    final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
-    final Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
-    final ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector();
+    TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
+    Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
+    ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector();
 
     TransferPair tp = null;
     if (flattenField instanceof AbstractRepeatedMapVector) {
-      tp = ((AbstractRepeatedMapVector) flattenField).getTransferPairToSingleMap(reference.getAsNamePart().getName(), oContext.getAllocator());
-    } else if ( !(flattenField instanceof RepeatedValueVector) ) {
+      tp = ((AbstractRepeatedMapVector) flattenField).getTransferPairToSingleMap(
+          reference.getAsNamePart().getName(), oContext.getAllocator());
+    } else if (!(flattenField instanceof RepeatedValueVector)) {
       if(incoming.getRecordCount() != 0) {
-        throw UserException.unsupportedError().message("Flatten does not support inputs of non-list values.").build(logger);
+        throw UserException.unsupportedError().message(
+            "Flatten does not support inputs of non-list values.").build(logger);
       }
       logger.error("Cannot cast {} to RepeatedValueVector", flattenField);
       //when incoming recordCount is 0, don't throw exception since the type being seen here is not solid
-      final ValueVector vv = new RepeatedMapVector(flattenField.getField(), oContext.getAllocator(), null);
-      tp = RepeatedValueVector.class.cast(vv).getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator());
+      ValueVector vv = new RepeatedMapVector(flattenField.getField(), oContext.getAllocator(), null);
+      tp = RepeatedValueVector.class.cast(vv).getTransferPair(
+          reference.getAsNamePart().getName(), oContext.getAllocator());
     } else {
-      final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
+      ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
       // vvIn may be null because of fast schema return for repeated list vectors
       if (vvIn != null) {
         tp = vvIn.getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator());
@@ -387,28 +373,32 @@
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    this.allocationVectors = new ArrayList<>();
+    allocationVectors = new ArrayList<>();
     container.clear();
-    final List<NamedExpression> exprs = getExpressionList();
-    final ErrorCollector collector = new ErrorCollectorImpl();
-    final List<TransferPair> transfers = new ArrayList<>();
+    List<NamedExpression> exprs = getExpressionList();
+    ErrorCollector collector = new ErrorCollectorImpl();
+    List<TransferPair> transfers = new ArrayList<>();
 
-    final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions());
+    ClassGenerator<Flattener> cg = CodeGenerator.getRoot(
+        Flattener.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
-    final IntHashSet transferFieldIds = new IntHashSet();
+    IntHashSet transferFieldIds = new IntHashSet();
 
-    final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
-    final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
-    final FieldReference fieldReference = flattenExpr.getRef();
-    final TransferPair transferPair = getFlattenFieldTransferPair(fieldReference);
+    NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(),
+        new FieldReference(popConfig.getColumn()));
+    ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(
+        flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+    FieldReference fieldReference = flattenExpr.getRef();
+    TransferPair transferPair = getFlattenFieldTransferPair(fieldReference);
 
     if (transferPair != null) {
-      final ValueVector flattenVector = transferPair.getTo();
+      ValueVector flattenVector = transferPair.getTo();
 
       // checks that list has only default ValueVector and replaces resulting ValueVector to INT typed ValueVector
       if (exprs.size() == 0 && flattenVector.getField().getType().equals(Types.LATE_BIND_TYPE)) {
-        final MaterializedField outputField = MaterializedField.create(fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT);
-        final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+        MaterializedField outputField = MaterializedField.create(
+            fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT);
+        ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
 
         container.add(vector);
       } else {
@@ -435,9 +425,11 @@
         }
       }
 
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+      LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(),
+          incoming, collector, context.getFunctionRegistry(), true);
       if (collector.hasErrors()) {
-        throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+        throw new SchemaChangeException(String.format(
+            "Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
       }
       if (expr instanceof DrillFuncHolderExpr &&
           ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) {
@@ -452,10 +444,11 @@
         cg.addExpr(expr);
       } else {
         // need to do evaluation.
-        final MaterializedField outputField;
+        MaterializedField outputField;
         if (expr instanceof ValueVectorReadExpression) {
-          final TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId();
-          final ValueVector incomingVector = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+          TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId();
+          ValueVector incomingVector = incoming.getValueAccessorById(
+              id.getIntermediateClass(), id.getFieldIds()).getValueVector();
           // outputField is taken from the incoming schema to avoid the loss of nested fields
           // when the first batch will be empty.
           if (incomingVector != null) {
@@ -466,7 +459,7 @@
         } else {
           outputField = MaterializedField.create(outputName, expr.getMajorType());
         }
-        final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+        ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
         allocationVectors.add(vector);
         TypedFieldId fid = container.add(vector);
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
@@ -482,7 +475,7 @@
     container.buildSchema(SelectionVectorMode.NONE);
 
     try {
-      this.flattener = context.getImplementationClass(cg.getCodeGenerator());
+      flattener = context.getImplementationClass(cg.getCodeGenerator());
       flattener.setup(context, incoming, this, transfers);
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 7c59b4d..ef477b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -17,8 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.sun.codemodel.JClass;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
@@ -74,7 +76,7 @@
  */
 public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(MergeJoinBatch.class);
 
   private final MappingSet setupMapping =
     new MappingSet("null", "null",
@@ -177,6 +179,7 @@
     }
 
     allocateBatch(true);
+    container.setEmpty();
   }
 
   @Override
@@ -269,11 +272,7 @@
   }
 
   private void setRecordCountInContainer() {
-    for (VectorWrapper vw : container) {
-      Preconditions.checkArgument(!vw.isHyper());
-      vw.getValueVector().getMutator().setValueCount(getRecordCount());
-    }
-
+    container.setValueCount(getRecordCount());
     RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
     batchMemoryManager.updateOutgoingStats(getRecordCount());
   }
@@ -487,7 +486,7 @@
     // Allocate memory for the vectors.
     // This will iteratively allocate memory for all nested columns underneath.
     int outputRowCount = batchMemoryManager.getOutputRowCount();
-    for (VectorWrapper w : container) {
+    for (VectorWrapper<?> w : container) {
       RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), outputRowCount);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 7513ebd..6b7edd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -23,6 +23,8 @@
 import java.util.Map;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -66,23 +68,23 @@
  * RecordBatch implementation for the nested loop join operator
  */
 public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(NestedLoopJoinBatch.class);
 
   // Input indexes to correctly update the stats
   protected static final int LEFT_INPUT = 0;
   protected static final int RIGHT_INPUT = 1;
 
   // Schema on the left side
-  private BatchSchema leftSchema = null;
+  private BatchSchema leftSchema;
 
   // Schema on the right side
-  private BatchSchema rightSchema = null;
+  private BatchSchema rightSchema;
 
   // Runtime generated class implementing the NestedLoopJoin interface
-  private NestedLoopJoin nljWorker = null;
+  private NestedLoopJoin nljWorker;
 
   // Number of output records in the current outgoing batch
-  private int outputRecords = 0;
+  private int outputRecords;
 
   // We accumulate all the batches on the right side in a hyper container.
   private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer();
@@ -198,12 +200,7 @@
     outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
 
     // Set the record count
-    for (final VectorWrapper<?> vw : container) {
-      vw.getValueVector().getMutator().setValueCount(outputRecords);
-    }
-
-    // Set the record count in the container
-    container.setRecordCount(outputRecords);
+    container.setValueCount(outputRecords);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
@@ -413,6 +410,7 @@
         batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
 
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      container.setEmpty();
 
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException(e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index bb49187..e9d7dd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -32,12 +32,14 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -122,7 +124,7 @@
     container.clear();
     transfers.clear();
 
-    for(final VectorWrapper<?> v : incoming) {
+    for (final VectorWrapper<?> v : incoming) {
       final TransferPair pair = v.getValueVector().makeTransferPair(
         container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
@@ -130,7 +132,7 @@
 
     final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
 
-    switch(svMode) {
+    switch (svMode) {
       case NONE:
         break;
       case TWO_BYTE:
@@ -174,10 +176,11 @@
     final int inputRecordCount = incoming.getRecordCount();
     if (inputRecordCount == 0) {
       setOutgoingRecordCount(0);
+      container.setEmpty();
       return getFinalOutcome(false);
     }
 
-    for(final TransferPair tp : transfers) {
+    for (final TransferPair tp : transfers) {
       tp.transfer();
     }
     // Check if current input record count is less than start offset. If yes then adjust the start offset since we
@@ -185,6 +188,7 @@
     if (inputRecordCount <= recordStartOffset) {
       recordStartOffset -= inputRecordCount;
       setOutgoingRecordCount(0);
+      container.setEmpty();
     } else {
       // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
       // batch to output record batch and later an SV2Remover copies the needed records.
@@ -194,7 +198,9 @@
 
     // clear memory for incoming sv (if any)
     if (incomingSv != null) {
-      outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount());
+      int incomingCount = incomingSv.getBatchActualRecordCount();
+      outgoingSv.setBatchActualRecordCount(incomingCount);
+      container.setRecordCount(incomingCount);
       incomingSv.clear();
     }
 
@@ -218,7 +224,7 @@
     }
 
     int svIndex = 0;
-    for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
+    for (int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
       if (incomingSv != null) {
         outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
       } else {
@@ -226,12 +232,17 @@
       }
     }
     outgoingSv.setRecordCount(svIndex);
+    outgoingSv.setBatchActualRecordCount(inputRecordCount);
+    // Actual number of values in the container; not the number in
+    // the SV.
+    container.setRecordCount(inputRecordCount);
     // Update the start offset
     recordStartOffset = 0;
   }
 
   private void setOutgoingRecordCount(int outputCount) {
     outgoingSv.setRecordCount(outputCount);
+    outgoingSv.setBatchActualRecordCount(outputCount);
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 639a25a..07f5069 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -32,6 +32,8 @@
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Modular implementation of the standard Drill record batch iterator
@@ -51,7 +53,7 @@
  */
 
 public class OperatorRecordBatch implements CloseableRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(OperatorRecordBatch.class);
 
   private final OperatorDriver driver;
   private final BatchAccessor batchAccessor;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index 496f326..866b988 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -21,6 +21,8 @@
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.accessor.InvalidConversionError;
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manages a row batch reader through its lifecycle. Created when the reader
@@ -128,7 +130,7 @@
  */
 
 class ReaderState {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderState.class);
+  static final Logger logger = LoggerFactory.getLogger(ReaderState.class);
 
   private enum State {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index 701c82b..53664af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -25,6 +25,8 @@
 import org.apache.drill.exec.physical.impl.protocol.OperatorExec;
 import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of the revised scan operator that uses a mutator aware of
@@ -155,7 +157,7 @@
     CLOSED
   }
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanOperatorExec.class);
+  static final Logger logger = LoggerFactory.getLogger(ScanOperatorExec.class);
 
   private final ScanOperatorEvents factory;
   private final boolean allowEmptyResult;
@@ -295,6 +297,13 @@
     if (allowEmptyResult &&
         containerAccessor.batchCount() == 0 &&
         containerAccessor.schemaVersion() > 0) {
+
+      // We've exhausted all readers, none had data, but at least
+      // one had a schema. Any zero-sized batch produced by a reader
+      // was cleared when closing the reader. Recreate a valid empty
+      // batch here to return downstream.
+
+      containerAccessor.getOutgoingContainer().setEmpty();
       state = State.EMPTY;
     } else {
       state = State.END;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
index 712d21c..095372c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
@@ -23,6 +23,8 @@
 import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
 import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Basic scan framework for a "managed" reader which uses the scan schema
@@ -112,7 +114,7 @@
 
 public class ManagedScanFramework implements ScanOperatorEvents {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ManagedScanFramework.class);
+  static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class);
 
   /**
    * Creates a batch reader on demand. Unlike earlier versions of Drill,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
index 5fa2187..8628873 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
@@ -202,10 +202,10 @@
 
     public AbstractMapVector buildMap() {
       if (parentColumn.sourceIndex() != -1) {
-        final ResolvedTuple parentTuple = parentColumn.parent();
+        ResolvedTuple parentTuple = parentColumn.parent();
         inputMap = (AbstractMapVector) parentTuple.vector(parentColumn.sourceIndex());
       }
-      final MaterializedField colSchema = parentColumn.schema();
+      MaterializedField colSchema = parentColumn.schema();
       outputMap = createMap(inputMap,
           MaterializedField.create(
               colSchema.getName(), colSchema.getType()),
@@ -275,8 +275,8 @@
       // Create a new map array, reusing the offset vector from
       // the original input map.
 
-      final RepeatedMapVector source = (RepeatedMapVector) inputMap;
-      final UInt4Vector offsets = source.getOffsetVector();
+      RepeatedMapVector source = (RepeatedMapVector) inputMap;
+      UInt4Vector offsets = source.getOffsetVector();
       valueCount = offsets.getAccessor().getValueCount();
       return new RepeatedMapVector(schema,
           offsets, null);
@@ -342,7 +342,7 @@
       nullBuilder.build(vectorCache);
     }
     if (children != null) {
-      for (final ResolvedTuple child : children) {
+      for (ResolvedTuple child : children) {
         child.buildNulls(vectorCache.childCache(child.name()));
       }
     }
@@ -353,7 +353,7 @@
       nullBuilder.load(rowCount);
     }
     if (children != null) {
-      for (final ResolvedTuple child : children) {
+      for (ResolvedTuple child : children) {
         child.loadNulls(innerCardinality(rowCount));
       }
     }
@@ -402,7 +402,7 @@
     if (children == null) {
       return;
     }
-    for (final ResolvedTuple child : children) {
+    for (ResolvedTuple child : children) {
       child.setRowCount(rowCount);
     }
   }
@@ -426,7 +426,7 @@
       nullBuilder.close();
     }
     if (children != null) {
-      for (final ResolvedTuple child : children) {
+      for (ResolvedTuple child : children) {
         child.close();
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 695e26a..f7e63e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -23,7 +23,13 @@
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
 import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
+import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
+import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
+import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
+import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch;
+import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
 import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
 import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
@@ -44,6 +50,8 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VarDecimalVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.ZeroVector;
 import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
 import org.apache.drill.exec.vector.complex.DictVector;
@@ -184,14 +192,20 @@
    */
   private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() {
     Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>();
-    rules.put(OperatorRecordBatch.class, CheckMode.VECTORS);
-    rules.put(ScanBatch.class, CheckMode.VECTORS);
+    rules.put(OperatorRecordBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(ScanBatch.class, CheckMode.ZERO_SIZE);
     rules.put(ProjectRecordBatch.class, CheckMode.ZERO_SIZE);
     rules.put(FilterRecordBatch.class, CheckMode.VECTORS);
     rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS);
     rules.put(UnnestRecordBatch.class, CheckMode.VECTORS);
     rules.put(HashAggBatch.class, CheckMode.VECTORS);
     rules.put(RemovingRecordBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(StreamingAggBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(RuntimeFilterRecordBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(FlattenRecordBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(MergeJoinBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(NestedLoopJoinBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(LimitRecordBatch.class, CheckMode.ZERO_SIZE);
     return rules;
   }
 
@@ -349,6 +363,8 @@
       validateDictVector(name, (DictVector) vector, topLevel);
     } else if (vector instanceof UnionVector) {
       validateUnionVector(name, (UnionVector) vector);
+    } else if (vector instanceof VarDecimalVector) {
+      validateVarDecimalVector(name, (VarDecimalVector) vector, topLevel);
     } else {
       logger.debug("Don't know how to validate vector: {}  of class {}",
           name, vector.getClass().getSimpleName());
@@ -369,20 +385,22 @@
   }
 
   private void validateVarCharVector(String name, VarCharVector vector, boolean topLevel) {
-    int valueCount = vector.getAccessor().getValueCount();
-
-    // Disabled because a large number of operators
-    // set up offset vectors incorrectly.
-    if (valueCount == 0 && (!topLevel || !checkZeroSize)) {
-      return;
-    }
-
     int dataLength = vector.getBuffer().writerIndex();
-    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false,
-        valueCount, dataLength, topLevel);
+    validateVarWidthVector(name, vector, dataLength, topLevel);
   }
 
   private void validateVarBinaryVector(String name, VarBinaryVector vector, boolean topLevel) {
+    int dataLength = vector.getBuffer().writerIndex();
+    validateVarWidthVector(name, vector, dataLength, topLevel);
+  }
+
+  private void validateVarDecimalVector(String name, VarDecimalVector vector,
+      boolean topLevel) {
+    int dataLength = vector.getBuffer().writerIndex();
+    validateVarWidthVector(name, vector, dataLength, topLevel);
+  }
+
+  private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
 
     // Disabled because a large number of operators
@@ -391,8 +409,7 @@
       return;
     }
 
-    int dataLength = vector.getBuffer().writerIndex();
-    validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false,
+    validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
         valueCount, dataLength, topLevel);
   }
 
@@ -401,7 +418,7 @@
     int dataLength = dataVector.getAccessor().getValueCount();
     int valueCount = vector.getAccessor().getValueCount();
     int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
-        true, valueCount, dataLength, topLevel);
+        valueCount, dataLength, topLevel);
 
     if (dataLength != itemCount) {
       error(name, vector, String.format(
@@ -419,7 +436,7 @@
     int valueCount = vector.getAccessor().getValueCount();
     int maxBitCount = valueCount * 8;
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), true, valueCount, maxBitCount, topLevel);
+        vector.getOffsetVector(), valueCount, maxBitCount, topLevel);
     BitVector dataVector = vector.getDataVector();
     if (dataVector.getAccessor().getValueCount() != elementCount) {
       error(name, vector, String.format(
@@ -452,7 +469,7 @@
       RepeatedMapVector vector, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
     for (ValueVector child: vector) {
       validateVector(elementCount, child, false);
     }
@@ -461,7 +478,7 @@
   private void validateDictVector(String name, DictVector vector, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
     validateVector(elementCount, vector.getKeys(), false);
     validateVector(elementCount, vector.getValues(), false);
   }
@@ -470,7 +487,7 @@
       RepeatedListVector vector, boolean topLevel) {
     int valueCount = vector.getAccessor().getValueCount();
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
     validateVector(elementCount, vector.getDataVector(), false);
   }
 
@@ -500,16 +517,17 @@
   }
 
   private int validateOffsetVector(String name, UInt4Vector offsetVector,
-      boolean repeated, int valueCount, int maxOffset, boolean topLevel) {
+      int valueCount, int maxOffset, boolean topLevel) {
     UInt4Vector.Accessor accessor = offsetVector.getAccessor();
     int offsetCount = accessor.getValueCount();
     // TODO: Disabled because a large number of operators
-    // set up offset vectors incorrectly.
-//    if (!repeated && offsetCount == 0) {
-//      System.out.println(String.format(
-//          "Offset vector for %s: [0] has length 0, expected 1+",
-//          name));
-//      return false;
+    // set up offset vectors incorrectly. Either that, or semantics
+    // are ill-defined: some vectors assume an offset vector length
+    // of 0 if the "outer" value count is zero (which, while fiddly, is
+    // a workable definition.)
+//    if (checkZeroSize && topLevel && offsetCount == 0) {
+//      error(name, offsetVector,
+//          "Offset vector has length 0, expected 1+");
 //    }
     if (valueCount == 0 && offsetCount > 1 || valueCount > 0 && offsetCount != valueCount + 1) {
       error(name, offsetVector, String.format(
@@ -520,7 +538,8 @@
       return 0;
     }
 
-    // First value must be zero in current version.
+    // First value must be zero. (This is why offset vectors have one more
+    // value than the number of rows.)
 
     int prevOffset;
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 4d70065..07f919d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -30,6 +30,8 @@
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of the result set loader. Caches vectors
@@ -163,7 +165,7 @@
     CLOSED
   }
 
-  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResultSetLoaderImpl.class);
+  protected static final Logger logger = LoggerFactory.getLogger(ResultSetLoaderImpl.class);
 
   /**
    * Options provided to this loader.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
index 057c0f2..de8a77e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.nested;
 
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.test.BaseTestQuery;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -26,7 +26,6 @@
 
   @Test
   public void test() throws Exception {
-//   test("select r.r_name, t1.f from cp.`tpch/region.parquet` r join (select flatten(x) as f from (select convert_from('[0, 1]', 'json') as x from cp.`tpch/region.parquet`)) t1 on t1.f = r.r_regionkey");
     test("SELECT r.r_name, \n" +
             "       t1.f \n" +
             "FROM   cp.`tpch/region.parquet` r \n" +
@@ -111,5 +110,4 @@
         "(select first_name from cp.`employee.json` where first_name='Sheri')",
         "Flatten does not support inputs of non-list values");
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 07b34c0..7610e71 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -28,22 +28,21 @@
 import java.nio.file.Paths;
 import java.util.List;
 
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.OperatorTest;
-import org.apache.drill.test.TestBuilder;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.SubDirTestWatcher;
+import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
 @Category(OperatorTest.class)
 public class TestFlatten extends BaseTestQuery {
 
@@ -88,7 +87,6 @@
         .setRecord(jsonRecords)
         .createFiles(1, numCopies, "json");
 
-    @SuppressWarnings("unchecked")
     List<JsonStringHashMap<String,Object>> data = Lists.newArrayList(
         mapOf("uid", 1l,
             "lst_lst_0", listOf(1l, 2l, 3l, 4l, 5l),
@@ -123,7 +121,6 @@
 
   @Test
   public void testFlattenReferenceImpl() throws Exception {
-    @SuppressWarnings("unchecked")
     List<JsonStringHashMap<String,Object>> data = Lists.newArrayList(
         mapOf("a",1,
               "b",2,
@@ -133,7 +130,6 @@
                   listOf(1000,999)
             )));
     List<JsonStringHashMap<String, Object>> result = flatten(flatten(flatten(data, "list_col"), "nested_list_col"), "nested_list_col");
-     @SuppressWarnings("unchecked")
     List<JsonStringHashMap<String, Object>> expectedResult = Lists.newArrayList(
         mapOf("nested_list_col", 100,  "list_col", 10,"a", 1, "b",2),
         mapOf("nested_list_col", 99,   "list_col", 10,"a", 1, "b",2),
@@ -195,7 +191,6 @@
         .setRecord(jsonRecord)
         .createFiles(1, numRecords, "json");
 
-    @SuppressWarnings("unchecked")
     List<JsonStringHashMap<String,Object>> data = Lists.newArrayList(
         mapOf("int_list", inputList)
     );
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index 6d408b8..0278749 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -102,7 +102,7 @@
   public FieldReader getReader() { return reader; }
 
   @Override
-  public int getBufferSizeFor(final int valueCount) {
+  public int getBufferSizeFor(int valueCount) {
     if (valueCount == 0) {
       return 0;
     }
@@ -121,8 +121,8 @@
   public Mutator getMutator() { return mutator; }
 
   @Override
-  public void setInitialCapacity(final int valueCount) {
-    final long size = (long) valueCount * VALUE_WIDTH;
+  public void setInitialCapacity(int valueCount) {
+    long size = (long) valueCount * VALUE_WIDTH;
     // TODO: Replace this with MAX_BUFFER_SIZE once all
     // code is aware of the maximum vector size.
     if (size > MAX_ALLOCATION_SIZE) {
@@ -170,7 +170,7 @@
    *           if it can't allocate the new buffer
    */
   @Override
-  public void allocateNew(final int valueCount) {
+  public void allocateNew(int valueCount) {
     allocateBytes(valueCount * VALUE_WIDTH);
   }
 
@@ -182,14 +182,14 @@
     super.reset();
   }
 
-  private void allocateBytes(final long size) {
+  private void allocateBytes(long size) {
     // TODO: Replace this with MAX_BUFFER_SIZE once all
     // code is aware of the maximum vector size.
     if (size > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
     }
 
-    final int curSize = (int)size;
+    int curSize = (int)size;
     clear();
     data = allocator.buffer(curSize);
     data.readerIndex(0);
@@ -214,7 +214,7 @@
         ? 256
         : allocationSizeInBytes * 2L;
 
-    final int currentCapacity = data.capacity();
+    int currentCapacity = data.capacity();
     // Some operations, such as Value Vector#exchange, can be change DrillBuf data field without corresponding allocation size changes.
     // Check that the size of the allocation is sufficient to copy the old buffer.
     while (newAllocationSize < currentCapacity) {
@@ -238,7 +238,7 @@
     if (newAllocationSize == 0) {
       throw new IllegalStateException("Attempt to reAlloc a zero-sized vector");
     }
-    final DrillBuf newBuf = allocator.buffer(newAllocationSize);
+    DrillBuf newBuf = allocator.buffer(newAllocationSize);
     newBuf.setBytes(0, data, 0, data.capacity());
     newBuf.writerIndex(data.writerIndex());
     data.release(1);
@@ -259,9 +259,9 @@
   public void load(SerializedField metadata, DrillBuf buffer) {
     Preconditions.checkArgument(this.field.getName().equals(metadata.getNamePart().getName()),
                                 "The field %s doesn't match the provided metadata %s.", this.field, metadata);
-    final int actualLength = metadata.getBufferLength();
-    final int valueCount = metadata.getValueCount();
-    final int expectedLength = valueCount * VALUE_WIDTH;
+    int actualLength = metadata.getBufferLength();
+    int valueCount = metadata.getValueCount();
+    int expectedLength = valueCount * VALUE_WIDTH;
     assert actualLength == expectedLength : String.format("Expected to load %d bytes but actually loaded %d bytes", expectedLength, actualLength);
 
     clear();
@@ -296,8 +296,8 @@
   }
 
   public void splitAndTransferTo(int startIndex, int length, ${minor.class}Vector target) {
-    final int startPoint = startIndex * VALUE_WIDTH;
-    final int sliceLength = length * VALUE_WIDTH;
+    int startPoint = startIndex * VALUE_WIDTH;
+    int sliceLength = length * VALUE_WIDTH;
     target.clear();
     target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
     target.data.writerIndex(sliceLength);
@@ -402,14 +402,14 @@
     <#if (minor.class == "Interval")>
 
     public void get(int index, ${minor.class}Holder holder) {
-      final int offsetIndex = index * VALUE_WIDTH;
+      int offsetIndex = index * VALUE_WIDTH;
       holder.months = data.getInt(offsetIndex);
       holder.days = data.getInt(offsetIndex + ${minor.daysOffset});
       holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
     }
 
     public void get(int index, Nullable${minor.class}Holder holder) {
-      final int offsetIndex = index * VALUE_WIDTH;
+      int offsetIndex = index * VALUE_WIDTH;
       holder.isSet = 1;
       holder.months = data.getInt(offsetIndex);
       holder.days = data.getInt(offsetIndex + ${minor.daysOffset});
@@ -418,30 +418,30 @@
 
     @Override
     public ${friendlyType} getObject(int index) {
-      final int offsetIndex = index * VALUE_WIDTH;
-      final int months  = data.getInt(offsetIndex);
-      final int days    = data.getInt(offsetIndex + ${minor.daysOffset});
-      final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+      int offsetIndex = index * VALUE_WIDTH;
+      int months  = data.getInt(offsetIndex);
+      int days    = data.getInt(offsetIndex + ${minor.daysOffset});
+      int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
       return DateUtilities.fromInterval(months, days, millis);
     }
 
     public StringBuilder getAsStringBuilder(int index) {
-      final int offsetIndex = index * VALUE_WIDTH;
-      final int months = data.getInt(offsetIndex);
-      final int days   = data.getInt(offsetIndex + ${minor.daysOffset});
-      final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+      int offsetIndex = index * VALUE_WIDTH;
+      int months = data.getInt(offsetIndex);
+      int days   = data.getInt(offsetIndex + ${minor.daysOffset});
+      int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
       return DateUtilities.intervalStringBuilder(months, days, millis);
     }
     <#elseif (minor.class == "IntervalDay")>
 
     public void get(int index, ${minor.class}Holder holder) {
-      final int offsetIndex = index * VALUE_WIDTH;
+      int offsetIndex = index * VALUE_WIDTH;
       holder.days = data.getInt(offsetIndex);
       holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
     }
 
     public void get(int index, Nullable${minor.class}Holder holder) {
-      final int offsetIndex = index * VALUE_WIDTH;
+      int offsetIndex = index * VALUE_WIDTH;
       holder.isSet = 1;
       holder.days = data.getInt(offsetIndex);
       holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
@@ -449,16 +449,16 @@
 
     @Override
     public ${friendlyType} getObject(int index) {
-      final int offsetIndex = index * VALUE_WIDTH;
-      final int days   = data.getInt(offsetIndex);
-      final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+      int offsetIndex = index * VALUE_WIDTH;
+      int days   = data.getInt(offsetIndex);
+      int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
       return DateUtilities.fromIntervalDay(days, millis);
     }
 
     public StringBuilder getAsStringBuilder(int index) {
-      final int offsetIndex = index * VALUE_WIDTH;
-      final int days   = data.getInt(offsetIndex);
-      final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+      int offsetIndex = index * VALUE_WIDTH;
+      int days   = data.getInt(offsetIndex);
+      int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
       return DateUtilities.intervalDayStringBuilder(days, millis);
     }
     <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense">
@@ -551,7 +551,7 @@
 
     @Override
     public ${friendlyType} getObject(int index) {
-      final BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value());
+      BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value());
       return new BigDecimal(value, getField().getScale());
     }
     <#else>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
index 9f0b1d5..1ca1f5e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -215,8 +215,8 @@
     // This is performance critical code; every operation counts.
     // Please be thoughtful when changing the code.
 
-    final int valueIndex = prepareFill();
-    final int fillCount = valueIndex - lastWriteIndex - 1;
+    int valueIndex = prepareFill();
+    int fillCount = valueIndex - lastWriteIndex - 1;
     if (fillCount > 0) {
       fillEmpties(fillCount);
     }
@@ -228,7 +228,7 @@
   }
 
   public final int prepareFill() {
-    final int valueIndex = vectorIndex.vectorIndex();
+    int valueIndex = vectorIndex.vectorIndex();
     if (valueIndex + 1 < capacity) {
       return valueIndex;
     }
@@ -240,32 +240,32 @@
   }
 
   @Override
-  protected final void fillEmpties(final int fillCount) {
+  protected final void fillEmpties(int fillCount) {
     for (int i = 0; i < fillCount; i++) {
       fillOffset(nextOffset);
     }
   }
 
   @Override
-  public final void setNextOffset(final int newOffset) {
-    final int writeIndex = prepareWrite();
+  public final void setNextOffset(int newOffset) {
+    int writeIndex = prepareWrite();
     drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
     nextOffset = newOffset;
   }
 
-  public final void reviseOffset(final int newOffset) {
-    final int writeIndex = vectorIndex.vectorIndex() + 1;
+  public final void reviseOffset(int newOffset) {
+    int writeIndex = vectorIndex.vectorIndex() + 1;
     drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
     nextOffset = newOffset;
   }
 
-  public final void fillOffset(final int newOffset) {
+  public final void fillOffset(int newOffset) {
     drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, newOffset);
     nextOffset = newOffset;
   }
 
   @Override
-  public final void setValue(final Object value) {
+  public final void setValue(Object value) {
     throw new InvalidConversionError(
         "setValue() not supported for the offset vector writer: " + value);
   }
@@ -298,19 +298,22 @@
 
   @Override
   public void postRollover() {
-    final int newNext = nextOffset - rowStartOffset;
+    int newNext = nextOffset - rowStartOffset;
     super.postRollover();
     nextOffset = newNext;
   }
 
   @Override
   public void setValueCount(int valueCount) {
-    mandatoryResize(valueCount);
 
-    // Value count is in row positions.
+    // Value count is in row positions, not index
+    // positions. (There are one more index positions
+    // than row positions.)
 
+    int offsetCount = valueCount + 1;
+    mandatoryResize(offsetCount);
     fillEmpties(valueCount - lastWriteIndex - 1);
-    vector().getBuffer().writerIndex((valueCount + 1) * VALUE_WIDTH);
+    vector().getBuffer().writerIndex(offsetCount * VALUE_WIDTH);
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
index f19709d..046641b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.vector.complex;
 
-import io.netty.buffer.DrillBuf;
-
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
@@ -27,8 +25,8 @@
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeRuntimeException;
 import org.apache.drill.exec.expr.BasicTypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -38,10 +36,11 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VectorDescriptor;
 import org.apache.drill.exec.vector.ZeroVector;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ObjectArrays;
 
+import io.netty.buffer.DrillBuf;
+
 public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector {
 
   public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
@@ -349,6 +348,12 @@
     @Override
     public void setValueCount(int valueCount) {
       // TODO: populate offset end points
+      // Convention seems to be that if the "outer" value count
+      // is zero, then the offset vector can also be zero-length:
+      // the normal initial zero position is omitted. While this
+      // saves a bit of memory, it greatly complicates code that
+      // works with vectors because of the special case for zero-length
+      // vectors.
       offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1);
       final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount);
       vector.getMutator().setValueCount(childValueCount);