DRILL-7414: EVF incorrectly sets buffer writer index after rollover

Enabling the vector validator on the "new" scan operator, in cases
in which overflow occurs, identified that the DrillBuf writer index
was not properly set for repeated vectors.

Enables such checking, adds unit tests, and fixes the writer index
issue.

closes #1878
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index dde6583..2753f55 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -20,6 +20,7 @@
 import java.util.IdentityHashMap;
 import java.util.Map;
 
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -153,13 +154,13 @@
    * unnecessary.
    */
   private static Map<Class<? extends RecordBatch>, CheckMode> buildRules() {
-    final Map<Class<? extends RecordBatch>, CheckMode> rules = new IdentityHashMap<>();
-    //rules.put(OperatorRecordBatch.class, CheckMode.ALL);
+    Map<Class<? extends RecordBatch>, CheckMode> rules = new IdentityHashMap<>();
+    rules.put(OperatorRecordBatch.class, CheckMode.ALL);
     return rules;
   }
 
   public static boolean validate(RecordBatch batch) {
-    final CheckMode checkMode = checkRules.get(batch.getClass());
+    CheckMode checkMode = checkRules.get(batch.getClass());
 
     // If no rule, don't check this batch.
 
@@ -173,10 +174,10 @@
 
     // All batches that do any checks will at least check counts.
 
-    final ErrorReporter reporter = errorReporter(batch);
-    final int rowCount = batch.getRecordCount();
+    ErrorReporter reporter = errorReporter(batch);
+    int rowCount = batch.getRecordCount();
     int valueCount = rowCount;
-    final VectorContainer container = batch.getContainer();
+    VectorContainer container = batch.getContainer();
     if (!container.hasRecordCount()) {
       reporter.error(String.format(
           "%s: Container record count not set",
@@ -185,11 +186,11 @@
       // Row count will = container count for most operators.
       // Row count <= container count for the filter operator.
 
-      final int containerRowCount = container.getRecordCount();
+      int containerRowCount = container.getRecordCount();
       valueCount = containerRowCount;
       switch (batch.getSchema().getSelectionVectorMode()) {
       case FOUR_BYTE:
-        final int sv4Count = batch.getSelectionVector4().getCount();
+        int sv4Count = batch.getSelectionVector4().getCount();
         if (sv4Count != rowCount) {
           reporter.error(String.format(
               "Mismatch between %s record count = %d, SV4 record count = %d",
@@ -199,7 +200,7 @@
         // TODO: Don't know how to check SV4 batches
         return true;
       case TWO_BYTE:
-        final int sv2Count = batch.getSelectionVector2().getCount();
+        int sv2Count = batch.getSelectionVector2().getCount();
         if (sv2Count != rowCount) {
           reporter.error(String.format(
               "Mismatch between %s record count = %d, SV2 record count = %d",
@@ -212,7 +213,7 @@
               batch.getClass().getSimpleName(),
               containerRowCount, sv2Count));
         }
-        final int svTotalCount = batch.getSelectionVector2().getBatchActualRecordCount();
+        int svTotalCount = batch.getSelectionVector2().getBatchActualRecordCount();
         if (svTotalCount != containerRowCount) {
           reporter.error(String.format(
               "Mismatch between %s container count = %d, SV2 total count = %d",
@@ -237,13 +238,13 @@
   }
 
   public static boolean validate(VectorAccessible batch) {
-    final ErrorReporter reporter = errorReporter(batch);
+    ErrorReporter reporter = errorReporter(batch);
     new BatchValidator(reporter).validateBatch(batch, batch.getRecordCount());
     return reporter.errorCount() == 0;
   }
 
   private static ErrorReporter errorReporter(VectorAccessible batch) {
-    final String opName = batch.getClass().getSimpleName();
+    String opName = batch.getClass().getSimpleName();
     if (LOG_TO_STDOUT) {
       return new StdOutReporter(opName);
     } else {
@@ -252,7 +253,7 @@
   }
 
   public void validateBatch(VectorAccessible batch, int rowCount) {
-    for (final VectorWrapper<? extends ValueVector> w : batch) {
+    for (VectorWrapper<? extends ValueVector> w : batch) {
       validateWrapper(rowCount, w);
     }
   }
@@ -264,7 +265,7 @@
   }
 
   private void validateVector(int expectedCount, ValueVector vector) {
-    final int valueCount = vector.getAccessor().getValueCount();
+    int valueCount = vector.getAccessor().getValueCount();
     if (valueCount != expectedCount) {
       error(vector.getField().getName(), vector,
           String.format("Row count = %d, but value count = %d",
@@ -293,9 +294,9 @@
   }
 
   private void validateNullableVector(String name, NullableVector vector) {
-    final int outerCount = vector.getAccessor().getValueCount();
-    final ValueVector valuesVector = vector.getValuesVector();
-    final int valueCount = valuesVector.getAccessor().getValueCount();
+    int outerCount = vector.getAccessor().getValueCount();
+    ValueVector valuesVector = vector.getValuesVector();
+    int valueCount = valuesVector.getAccessor().getValueCount();
     if (valueCount != outerCount) {
       error(name, vector, String.format(
           "Outer value count = %d, but inner value count = %d",
@@ -318,7 +319,7 @@
   }
 
   private void validateVarCharVector(String name, VarCharVector vector) {
-    final int valueCount = vector.getAccessor().getValueCount();
+    int valueCount = vector.getAccessor().getValueCount();
 
     // Disabled because a large number of operators
     // set up offset vectors wrongly.
@@ -326,15 +327,15 @@
       return;
     }
 
-    final int dataLength = vector.getBuffer().writerIndex();
+    int dataLength = vector.getBuffer().writerIndex();
     validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false, valueCount, dataLength);
   }
 
   private void validateRepeatedVector(String name, BaseRepeatedValueVector vector) {
-    final ValueVector dataVector = vector.getDataVector();
-    final int dataLength = dataVector.getAccessor().getValueCount();
-    final int valueCount = vector.getAccessor().getValueCount();
-    final int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
+    ValueVector dataVector = vector.getDataVector();
+    int dataLength = dataVector.getAccessor().getValueCount();
+    int valueCount = vector.getAccessor().getValueCount();
+    int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
         true, valueCount, dataLength);
 
     if (dataLength != itemCount) {
@@ -352,11 +353,11 @@
   }
 
   private void validateRepeatedBitVector(String name, RepeatedBitVector vector) {
-    final int valueCount = vector.getAccessor().getValueCount();
-    final int maxBitCount = valueCount * 8;
-    final int elementCount = validateOffsetVector(name + "-offsets",
+    int valueCount = vector.getAccessor().getValueCount();
+    int maxBitCount = valueCount * 8;
+    int elementCount = validateOffsetVector(name + "-offsets",
         vector.getOffsetVector(), true, valueCount, maxBitCount);
-    final BitVector dataVector = vector.getDataVector();
+    BitVector dataVector = vector.getDataVector();
     if (dataVector.getAccessor().getValueCount() != elementCount) {
       error(name, vector, String.format(
           "Bit vector has %d values, but offset vector labels %d values",
@@ -372,10 +373,10 @@
   }
 
   private void validateBitVector(String name, BitVector vector) {
-    final BitVector.Accessor accessor = vector.getAccessor();
-    final int valueCount = accessor.getValueCount();
-    final int dataLength = vector.getBuffer().writerIndex();
-    final int expectedLength = BitVector.getSizeFromCount(valueCount);
+    BitVector.Accessor accessor = vector.getAccessor();
+    int valueCount = accessor.getValueCount();
+    int dataLength = vector.getBuffer().writerIndex();
+    int expectedLength = BitVector.getSizeFromCount(valueCount);
     if (dataLength != expectedLength) {
       error(name, vector, String.format(
           "Bit vector has %d values, buffer has length %d, expected %d",
@@ -385,8 +386,8 @@
 
   private int validateOffsetVector(String name, UInt4Vector offsetVector,
       boolean repeated, int valueCount, int maxOffset) {
-    final UInt4Vector.Accessor accessor = offsetVector.getAccessor();
-    final int offsetCount = accessor.getValueCount();
+    UInt4Vector.Accessor accessor = offsetVector.getAccessor();
+    int offsetCount = accessor.getValueCount();
     // TODO: Disabled because a large number of operators
     // set up offset vectors incorrectly.
 //    if (!repeated && offsetCount == 0) {
@@ -412,7 +413,7 @@
     }
 
     for (int i = 1; i < offsetCount; i++) {
-      final int offset = accessor.get(i);
+      int offset = accessor.get(i);
       if (offset < prevOffset) {
         error(name, offsetVector, String.format(
             "Offset vector [%d] contained %d, expected >= %d",
@@ -432,19 +433,19 @@
   }
 
   private void verifyIsSetVector(ValueVector parent, UInt1Vector bv) {
-    final String name = String.format("%s (%s)-bits",
+    String name = String.format("%s (%s)-bits",
         parent.getField().getName(),
         parent.getClass().getSimpleName());
-    final int rowCount = parent.getAccessor().getValueCount();
-    final int bitCount = bv.getAccessor().getValueCount();
+    int rowCount = parent.getAccessor().getValueCount();
+    int bitCount = bv.getAccessor().getValueCount();
     if (bitCount != rowCount) {
       error(name, bv, String.format(
           "Value count = %d, but bit count = %d",
           rowCount, bitCount));
     }
-    final UInt1Vector.Accessor ba = bv.getAccessor();
+    UInt1Vector.Accessor ba = bv.getAccessor();
     for (int i = 0; i < bitCount; i++) {
-      final int value = ba.get(i);
+      int value = ba.get(i);
       if (value != 0 && value != 1) {
         error(name, bv, String.format(
             "Bit vector[%d] = %d, expected 0 or 1",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java
index 5fffe40..8688add 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RepeatedVectorState.java
@@ -47,7 +47,7 @@
     // vector, and the scalar (value) portion of the array writer.
 
     arrayWriter = (AbstractArrayWriter) writer;
-    AbstractScalarWriterImpl colWriter = (AbstractScalarWriterImpl) writer.entry().events();
+    final AbstractScalarWriterImpl colWriter = (AbstractScalarWriterImpl) writer.entry().events();
     valuesState = SimpleVectorState.vectorState(writer.schema(), colWriter, vector.getDataVector());
 
     // Create the offsets state with the offset vector portion of the repeated
@@ -116,19 +116,19 @@
    * after the values copied during roll-over.</li>
    * </ul>
    *
-   * @param cardinality the number of outer elements to create in the look-ahead
+   * @param newCardinality the number of outer elements to create in the look-ahead
    * vector
    */
 
   @Override
-  public void rollover(int cardinality) {
+  public void rollover(int newCardinality) {
 
     // Swap out the two vectors. The index presented to the caller
     // is that of the data vector: the next position in the data
     // vector to be set into the data vector writer index.
 
-    valuesState.rollover(childCardinality(cardinality));
-    offsetsState.rollover(cardinality);
+    valuesState.rollover(childCardinality(newCardinality));
+    offsetsState.rollover(newCardinality);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
index faf2031..60fb7f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/SingleVectorState.java
@@ -123,7 +123,7 @@
 
       // Cap the allocated size to the maximum.
 
-      final int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth());
+      int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth());
       ((VariableWidthVector) vector).allocateNew(size, cardinality);
       return vector.getAllocatedSize();
     }
@@ -199,9 +199,9 @@
       // for the current row. We must subtract that offset from each copied
       // value to adjust the offset for the destination.
 
-      final UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor();
-      final UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
-      final int offset = childWriter.rowStartIndex();
+      UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor();
+      UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
+      int offset = childWriter.rowStartIndex();
       int newIndex = 1;
       ResultSetLoaderImpl.logger.trace("Offset vector: copy {} values from {} to {} with offset {}",
           Math.max(0, sourceEndIndex - sourceStartIndex + 1),
@@ -261,13 +261,13 @@
   @Override
   public void rollover(int cardinality) {
 
-    final int sourceStartIndex = writer.rowStartIndex();
+    int sourceStartIndex = writer.rowStartIndex();
 
     // Remember the last write index for the original vector.
     // This tells us the end of the set of values to move, while the
     // sourceStartIndex above tells us the start.
 
-    final int sourceEndIndex = writer.lastWriteIndex();
+    int sourceEndIndex = writer.lastWriteIndex();
 
     // Switch buffers between the backup vector and the writer's output
     // vector. Done this way because writers are bound to vectors and
@@ -306,7 +306,7 @@
    */
 
   protected static MajorType parseVectorType(ValueVector vector) {
-    final MajorType purportedType = vector.getField().getType();
+    MajorType purportedType = vector.getField().getType();
     if (purportedType.getMode() != DataMode.OPTIONAL) {
       return purportedType;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index f803ef4..51e47ba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -17,11 +17,22 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.IndirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
@@ -30,20 +41,9 @@
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.physical.rowSet.DirectRowSet;
-import org.apache.drill.exec.physical.rowSet.IndirectRowSet;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-
-import javax.annotation.Nullable;
-import javax.validation.constraints.NotNull;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 public class MockRecordBatch implements CloseableRecordBatch {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordBatch.class);
 
   // These resources are owned by this RecordBatch
   protected VectorContainer container;
@@ -61,12 +61,12 @@
   protected final OperatorContext oContext;
   protected final BufferAllocator allocator;
 
-  private MockRecordBatch(@NotNull final FragmentContext context,
-                          @Nullable final OperatorContext oContext,
-                          @NotNull final List<RowSet> testRowSets,
-                          @NotNull final List<IterOutcome> iterOutcomes,
-                          @NotNull final BatchSchema schema,
-                          final boolean dummy) {
+  private MockRecordBatch(@NotNull FragmentContext context,
+                          @Nullable OperatorContext oContext,
+                          @NotNull List<RowSet> testRowSets,
+                          @NotNull List<IterOutcome> iterOutcomes,
+                          @NotNull BatchSchema schema,
+                          boolean dummy) {
     Preconditions.checkNotNull(testRowSets);
     Preconditions.checkNotNull(iterOutcomes);
     Preconditions.checkNotNull(schema);
@@ -83,11 +83,11 @@
   }
 
   @Deprecated
-  public MockRecordBatch(@Nullable final FragmentContext context,
-                         @Nullable final OperatorContext oContext,
-                         @NotNull final List<VectorContainer> testContainers,
-                         @NotNull final List<IterOutcome> iterOutcomes,
-                         final BatchSchema schema) {
+  public MockRecordBatch(@Nullable FragmentContext context,
+                         @Nullable OperatorContext oContext,
+                         @NotNull List<VectorContainer> testContainers,
+                         @NotNull List<IterOutcome> iterOutcomes,
+                         BatchSchema schema) {
     this(context,
          oContext,
          testContainers.stream().
@@ -99,12 +99,12 @@
   }
 
   @Deprecated
-  public MockRecordBatch(@Nullable final FragmentContext context,
-                         @Nullable final OperatorContext oContext,
-                         @NotNull final List<VectorContainer> testContainers,
-                         @NotNull final List<IterOutcome> iterOutcomes,
-                         @NotNull final List<SelectionVector2> selectionVector2s,
-                         final BatchSchema schema) {
+  public MockRecordBatch(@Nullable FragmentContext context,
+                         @Nullable OperatorContext oContext,
+                         @NotNull List<VectorContainer> testContainers,
+                         @NotNull List<IterOutcome> iterOutcomes,
+                         @NotNull List<SelectionVector2> selectionVector2s,
+                         BatchSchema schema) {
     this(context,
       oContext,
       new Supplier<List<RowSet>>() {
@@ -200,11 +200,11 @@
     IterOutcome currentOutcome;
 
     if (currentContainerIndex < rowSets.size()) {
-      final RowSet rowSet = rowSets.get(currentContainerIndex);
-      final VectorContainer input = rowSet.container();
+      RowSet rowSet = rowSets.get(currentContainerIndex);
+      VectorContainer input = rowSet.container();
       // We need to do this since the downstream operator expects vector reference to be same
       // after first next call in cases when schema is not changed
-      final BatchSchema inputSchema = input.getSchema();
+      BatchSchema inputSchema = input.getSchema();
       if (!container.getSchema().isEquivalent(inputSchema)) {
         container.clear();
         container = new VectorContainer(allocator, inputSchema);
@@ -217,7 +217,7 @@
           if ( input.hasRecordCount() ) { // in case special test of uninitialized input container
             container.setRecordCount(input.getRecordCount());
           }
-          final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
+          SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
 
           if (sv2 != null) {
             // Operators assume that new values for an Sv2 are transferred in.
@@ -307,23 +307,23 @@
     public Builder() {
     }
 
-    private Builder sendData(final RowSet rowSet, final IterOutcome outcome) {
+    private Builder sendData(RowSet rowSet, IterOutcome outcome) {
       Preconditions.checkState(batchSchema == null);
       rowSets.add(rowSet);
       iterOutcomes.add(outcome);
       return this;
     }
 
-    public Builder sendData(final RowSet rowSet) {
-      final IterOutcome outcome = rowSets.isEmpty()? IterOutcome.OK_NEW_SCHEMA: IterOutcome.OK;
+    public Builder sendData(RowSet rowSet) {
+      IterOutcome outcome = rowSets.isEmpty()? IterOutcome.OK_NEW_SCHEMA: IterOutcome.OK;
       return sendData(rowSet, outcome);
     }
 
-    public Builder sendDataWithNewSchema(final RowSet rowSet) {
+    public Builder sendDataWithNewSchema(RowSet rowSet) {
       return sendData(rowSet, IterOutcome.OK_NEW_SCHEMA);
     }
 
-    public Builder sendDataAndEmit(final RowSet rowSet) {
+    public Builder sendDataAndEmit(RowSet rowSet) {
       return sendData(rowSet, IterOutcome.EMIT);
     }
 
@@ -335,18 +335,18 @@
       return this;
     }
 
-    public Builder setSchema(final BatchSchema batchSchema) {
+    public Builder setSchema(BatchSchema batchSchema) {
       Preconditions.checkState(!rowSets.isEmpty());
       this.batchSchema = Preconditions.checkNotNull(batchSchema);
       return this;
     }
 
-    public Builder withOperatorContext(final OperatorContext oContext) {
+    public Builder withOperatorContext(OperatorContext oContext) {
       this.oContext = Preconditions.checkNotNull(oContext);
       return this;
     }
 
-    public MockRecordBatch build(final FragmentContext context) {
+    public MockRecordBatch build(FragmentContext context) {
       BatchSchema tempSchema = batchSchema;
 
       if (tempSchema == null && !rowSets.isEmpty()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java
index a691b2f..7dae42c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderOverflow.java
@@ -28,10 +28,14 @@
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.validate.BatchValidator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
@@ -40,8 +44,6 @@
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.test.SubOperatorTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -72,42 +74,51 @@
     rsLoader.startBatch();
     byte[] value = new byte[512];
     Arrays.fill(value, (byte) 'X');
-    int count = 0;
-    while (! rootWriter.isFull()) {
-      rootWriter.start();
-      rootWriter.scalar(0).setBytes(value, value.length);
-      rootWriter.save();
-      count++;
-    }
 
     // Number of rows should be driven by vector size.
     // Our row count should include the overflow row
 
     int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length;
-    assertEquals(expectedCount + 1, count);
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        rootWriter.scalar(0).setBytes(value, value.length);
+        rootWriter.save();
+        count++;
+      }
 
-    // Loader's row count should include only "visible" rows
+      assertEquals(expectedCount + 1, count);
 
-    assertEquals(expectedCount, rootWriter.rowCount());
+      // Loader's row count should include only "visible" rows
 
-    // Total count should include invisible and look-ahead rows.
+      assertEquals(expectedCount, rootWriter.rowCount());
 
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      // Total count should include invisible and look-ahead rows.
 
-    // Result should exclude the overflow row
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
-    result.clear();
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
 
     // Next batch should start with the overflow row
 
-    rsLoader.startBatch();
-    assertEquals(1, rootWriter.rowCount());
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(1, result.rowCount());
-    result.clear();
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
 
     rsLoader.close();
   }
@@ -135,41 +146,52 @@
     rsLoader.startBatch();
     byte[] value = new byte[512];
     Arrays.fill(value, (byte) 'X');
-    int count = 0;
-    while (! rootWriter.isFull()) {
-      rootWriter.start();
-      rootWriter.scalar(0).setBytes(value, value.length);
-      rootWriter.save();
-      count++;
-    }
 
     // Our row count should include the overflow row
 
     int expectedCount = 8 * 1024 * 1024 / value.length;
-    assertEquals(expectedCount + 1, count);
 
-    // Loader's row count should include only "visible" rows
+    // First batch, with overflow
 
-    assertEquals(expectedCount, rootWriter.rowCount());
+    {
+      int count = 0;
+      while (! rootWriter.isFull()) {
+        rootWriter.start();
+        rootWriter.scalar(0).setBytes(value, value.length);
+        rootWriter.save();
+        count++;
+      }
+      assertEquals(expectedCount + 1, count);
 
-    // Total count should include invisible and look-ahead rows.
+      // Loader's row count should include only "visible" rows
 
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      assertEquals(expectedCount, rootWriter.rowCount());
 
-    // Result should exclude the overflow row
+      // Total count should include invisible and look-ahead rows.
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
-    result.clear();
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+      // Result should exclude the overflow row
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      result.clear();
+    }
 
     // Next batch should start with the overflow row
 
-    rsLoader.startBatch();
-    assertEquals(1, rootWriter.rowCount());
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(1, result.rowCount());
-    result.clear();
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      result.clear();
+    }
 
     rsLoader.close();
   }
@@ -207,7 +229,9 @@
 
     // Harvest the full batch
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
+    VectorContainer container = rsLoader.harvest();
+    BatchValidator.validate(container);
+    RowSet result = fixture.wrap(container);
     result.clear();
 
     // Close without harvesting the overflow batch.
@@ -279,73 +303,79 @@
     byte[] value = new byte[473];
     Arrays.fill(value, (byte) 'X');
     String strValue = new String(value, Charsets.UTF_8);
-    int count = 0;
-    int rowSize = 0;
-    int totalSize = 0;
     int valuesPerArray = 13;
-    while (rootWriter.start()) {
-      totalSize += rowSize;
-      rowSize = 0;
-      ScalarWriter array = rootWriter.array(0).scalar();
-      for (int i = 0; i < valuesPerArray; i++) {
-        String cellValue = strValue + (count + 1) + "." + i;
-        array.setString(cellValue);
-        rowSize += cellValue.length();
+    int count = 0;
+
+    {
+      int rowSize = 0;
+      int totalSize = 0;
+      while (rootWriter.start()) {
+        totalSize += rowSize;
+        rowSize = 0;
+        ScalarWriter array = rootWriter.array(0).scalar();
+        for (int i = 0; i < valuesPerArray; i++) {
+          String cellValue = strValue + (count + 1) + "." + i;
+          array.setString(cellValue);
+          rowSize += cellValue.length();
+        }
+        rootWriter.save();
+        count++;
       }
-      rootWriter.save();
-      count++;
+
+      // Row count should include the overflow row.
+
+      int expectedCount = count - 1;
+
+      // Size without overflow row should fit in the vector, size
+      // with overflow should not.
+
+      assertTrue(totalSize <= ValueVector.MAX_BUFFER_SIZE);
+      assertTrue(totalSize + rowSize > ValueVector.MAX_BUFFER_SIZE);
+
+      // Result should exclude the overflow row. Last row
+      // should hold the last full array.
+
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
+      RowSetReader reader = result.reader();
+      reader.setPosition(expectedCount - 1);
+      ArrayReader arrayReader = reader.array(0);
+      ScalarReader strReader = arrayReader.scalar();
+      assertEquals(valuesPerArray, arrayReader.size());
+      for (int i = 0; i < valuesPerArray; i++) {
+        assertTrue(arrayReader.next());
+        String cellValue = strValue + (count - 1) + "." + i;
+        assertEquals(cellValue, strReader.getString());
+      }
+      result.clear();
     }
 
-    // Row count should include the overflow row.
-
-    int expectedCount = count - 1;
-
-    // Size without overflow row should fit in the vector, size
-    // with overflow should not.
-
-    assertTrue(totalSize <= ValueVector.MAX_BUFFER_SIZE);
-    assertTrue(totalSize + rowSize > ValueVector.MAX_BUFFER_SIZE);
-
-    // Result should exclude the overflow row. Last row
-    // should hold the last full array.
-
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
-    RowSetReader reader = result.reader();
-    reader.setPosition(expectedCount - 1);
-    ArrayReader arrayReader = reader.array(0);
-    ScalarReader strReader = arrayReader.scalar();
-    assertEquals(valuesPerArray, arrayReader.size());
-    for (int i = 0; i < valuesPerArray; i++) {
-      assertTrue(arrayReader.next());
-      String cellValue = strValue + (count - 1) + "." + i;
-      assertEquals(cellValue, strReader.getString());
-    }
-    result.clear();
-
     // Next batch should start with the overflow row.
     // The only row in this next batch should be the whole
     // array being written at the time of overflow.
 
-    rsLoader.startBatch();
-//    VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5);
-//    ((ResultSetLoaderImpl) rsLoader).dump(new HierarchicalPrinter());
-    assertEquals(1, rootWriter.rowCount());
-    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-//    VectorPrinter.printStrings((VarCharVector) ((VarCharColumnWriter) rootWriter.array(0).scalar()).vector(), 0, 5);
-    assertEquals(1, result.rowCount());
-    reader = result.reader();
-    reader.next();
-    arrayReader = reader.array(0);
-    strReader = arrayReader.scalar();
-    assertEquals(valuesPerArray, arrayReader.size());
-    for (int i = 0; i < valuesPerArray; i++) {
-      assertTrue(arrayReader.next());
-      String cellValue = strValue + (count) + "." + i;
-      assertEquals(cellValue, strReader.getString());
+    {
+      rsLoader.startBatch();
+      assertEquals(1, rootWriter.rowCount());
+      assertEquals(count, rsLoader.totalRowCount());
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      RowSetReader reader = result.reader();
+      reader.next();
+      ArrayReader arrayReader = reader.array(0);
+      ScalarReader strReader = arrayReader.scalar();
+      assertEquals(valuesPerArray, arrayReader.size());
+      for (int i = 0; i < valuesPerArray; i++) {
+        assertTrue(arrayReader.next());
+        String cellValue = strValue + count + "." + i;
+        assertEquals(cellValue, strReader.getString());
+      }
+      result.clear();
     }
-    result.clear();
 
     rsLoader.close();
   }
@@ -429,10 +459,11 @@
 
     // Verify
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(count - 1, result.rowCount());
-
     {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(count - 1, result.rowCount());
       RowSetReader reader = result.reader();
       ArrayReader aArray = reader.array("a");
       ScalarReader aReader = aArray.scalar();
@@ -502,10 +533,11 @@
       count++;
     }
 
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(6, result.rowCount());
-
     {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(6, result.rowCount());
       RowSetReader reader = result.reader();
       ArrayReader aArray = reader.array("a");
       ScalarReader aReader = aArray.scalar();
@@ -550,8 +582,8 @@
         }
         j++;
       }
+      result.clear();
     }
-    result.clear();
 
     rsLoader.close();
   }
@@ -624,7 +656,9 @@
       rowId++;
     }
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
+    VectorContainer container = rsLoader.harvest();
+    BatchValidator.validate(container);
+    RowSet result = fixture.wrap(container);
     assertEquals(rowId - 1, result.rowCount());
     RowSetReader reader = result.reader();
     ArrayReader cArray = reader.array("c");
@@ -677,30 +711,38 @@
 
     // Result should exclude the overflow row
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(count - 1, result.rowCount());
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(count - 1, result.rowCount());
 
-    RowSetReader reader = result.reader();
-    while (reader.next()) {
-      assertEquals(reader.offset(), reader.scalar(0).getInt());
-      assertTrue(reader.scalar(1).isNull());
-      assertArrayEquals(value, reader.scalar(2).getBytes());
-      assertTrue(reader.scalar(3).isNull());
+      RowSetReader reader = result.reader();
+      while (reader.next()) {
+        assertEquals(reader.offset(), reader.scalar(0).getInt());
+        assertTrue(reader.scalar(1).isNull());
+        assertArrayEquals(value, reader.scalar(2).getBytes());
+        assertTrue(reader.scalar(3).isNull());
+      }
+      result.clear();
     }
-    result.clear();
 
     // Next batch should start with the overflow row
 
     rsLoader.startBatch();
-    result = fixture.wrap(rsLoader.harvest());
-    reader = result.reader();
-    assertEquals(1, result.rowCount());
-    assertTrue(reader.next());
-    assertEquals(count - 1, reader.scalar(0).getInt());
-    assertTrue(reader.scalar(1).isNull());
-    assertArrayEquals(value, reader.scalar(2).getBytes());
-    assertTrue(reader.scalar(3).isNull());
-    result.clear();
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      RowSetReader reader = result.reader();
+      assertEquals(1, result.rowCount());
+      assertTrue(reader.next());
+      assertEquals(count - 1, reader.scalar(0).getInt());
+      assertTrue(reader.scalar(1).isNull());
+      assertArrayEquals(value, reader.scalar(2).getBytes());
+      assertTrue(reader.scalar(3).isNull());
+      result.clear();
+    }
 
     rsLoader.close();
   }
@@ -721,6 +763,11 @@
     byte[] head = "abc".getBytes();
     byte[] tail = new byte[523];
     Arrays.fill(tail, (byte) 'X');
+
+    String expected = new String(head, Charsets.UTF_8);
+    expected += new String(tail, Charsets.UTF_8);
+    expected += new String(tail, Charsets.UTF_8);
+
     int count = 0;
     ScalarWriter colWriter = rootWriter.scalar(0);
     while (! rootWriter.isFull()) {
@@ -749,32 +796,37 @@
 
     // Result should exclude the overflow row
 
-    RowSet result = fixture.wrap(rsLoader.harvest());
-    assertEquals(expectedCount, result.rowCount());
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(expectedCount, result.rowCount());
 
-    // Verify that the values were, in fact, appended.
+      // Verify that the values were, in fact, appended.
 
-    String expected = new String(head, Charsets.UTF_8);
-    expected += new String(tail, Charsets.UTF_8);
-    expected += new String(tail, Charsets.UTF_8);
-    RowSetReader reader = result.reader();
-    while (reader.next()) {
-      assertEquals(expected, reader.scalar(0).getString());
+      RowSetReader reader = result.reader();
+      while (reader.next()) {
+        assertEquals(expected, reader.scalar(0).getString());
+      }
+      result.clear();
     }
-    result.clear();
 
     // Next batch should start with the overflow row
 
     rsLoader.startBatch();
     assertEquals(1, rootWriter.rowCount());
     assertEquals(expectedCount + 1, rsLoader.totalRowCount());
-    result = fixture.wrap(rsLoader.harvest());
-    assertEquals(1, result.rowCount());
-    reader = result.reader();
-    while (reader.next()) {
-      assertEquals(expected, reader.scalar(0).getString());
+    {
+      VectorContainer container = rsLoader.harvest();
+      BatchValidator.validate(container);
+      RowSet result = fixture.wrap(container);
+      assertEquals(1, result.rowCount());
+      RowSetReader reader = result.reader();
+      while (reader.next()) {
+        assertEquals(expected, reader.scalar(0).getString());
+      }
+      result.clear();
     }
-    result.clear();
 
     rsLoader.close();
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
index 1da362a..9f0b1d5 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -288,9 +288,12 @@
     // Rollover is occurring. This means the current row is not complete.
     // We want to keep 0..(row index - 1) which gives us (row index)
     // rows. But, this being an offset vector, we add one to account
-    // for the extra 0 value at the start.
+    // for the extra 0 value at the start. That is, we want to set
+    // the value count to the current row start index, which already
+    // is set to one past the index of the last zero-based index.
+    // (Offset vector indexes are confusing.)
 
-    setValueCount(vectorIndex.rowStartIndex() + 1);
+    setValueCount(vectorIndex.rowStartIndex());
   }
 
   @Override