DRILL-7324: Final set of "batch count" fixes

Final set of fixes for batch count/record count issues. Enables
vector checking for all operators.

closes #1912
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 3e658cb..f464b27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -575,7 +575,6 @@
     }
   }
 
-
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     return container.iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 5ae6e76..baef314 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -559,12 +559,8 @@
       // Transfers count number of records from hyperBatch to simple container
       final int copiedRecords = copier.copyRecords(0, count);
       assert copiedRecords == count;
-      for (VectorWrapper<?> v : newContainer) {
-        ValueVector.Mutator m = v.getValueVector().getMutator();
-        m.setValueCount(count);
-      }
       newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      newContainer.setRecordCount(count);
+      newContainer.setValueCount(count);
       // Store all the batches containing limit number of records
       batchBuilder.add(newBatch);
     } while (queueSv4.next());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 45c670b..38fb14e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -269,7 +269,7 @@
     for (VectorWrapper<?> w : container) {
       AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0);
     }
-    container.setValueCount(0);
+    container.setEmpty();
     if (incoming.getRecordCount() > 0) {
       hashAggMemoryManager.update();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c3b504a..586fa32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -186,9 +186,7 @@
     if (!createAggregator()) {
       state = BatchState.DONE;
     }
-    for (VectorWrapper<?> w : container) {
-      w.getValueVector().allocateNew();
-    }
+    container.allocateNew();
 
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index c189367..e3b6070 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -66,12 +66,7 @@
     if (recordCount == 0) {
       outgoingSelectionVector.setRecordCount(0);
       outgoingSelectionVector.setBatchActualRecordCount(0);
-
-      // Must allocate vectors, then set count to zero. Allocation
-      // is needed since offset vectors must contain at least one
-      // item (the required value of 0 in index location 0.)
-      outgoing.getContainer().allocateNew();
-      outgoing.getContainer().setValueCount(0);
+      outgoing.getContainer().setEmpty();
       return;
     }
     if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index eab38ec..cded844 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -77,6 +77,7 @@
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.record.RecordBatchStats;
@@ -703,9 +704,7 @@
   private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
       batch.kill(true);
       while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
-        for (VectorWrapper<?> wrapper : batch) {
-          wrapper.getValueVector().clear();
-        }
+        VectorAccessibleUtilities.clear(batch);
         upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
       }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 6b7edd2..c84f954 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -87,10 +87,10 @@
   private int outputRecords;
 
   // We accumulate all the batches on the right side in a hyper container.
-  private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer();
+  private final ExpandableHyperContainer rightContainer = new ExpandableHyperContainer();
 
   // Record count of the individual batches in the right hyper container
-  private LinkedList<Integer> rightCounts = new LinkedList<>();
+  private final LinkedList<Integer> rightCounts = new LinkedList<>();
 
 
   // Generator mapping for the right side
@@ -372,9 +372,7 @@
 
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
-        for (final VectorWrapper<?> vw : left) {
-          container.addOrGet(vw.getField());
-        }
+        container.copySchemaFrom(left);
       }
 
       if (rightUpstream != IterOutcome.NONE) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 9ccae49..ab82769 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -17,6 +17,17 @@
  */
 package org.apache.drill.exec.physical.impl.metadata;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -24,6 +35,7 @@
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils;
 import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MetadataControllerPOP;
@@ -32,7 +44,6 @@
 import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.WriterPrel;
-import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
@@ -80,17 +91,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 /**
  * Terminal operator for producing ANALYZE statement. This operator is responsible for converting
  * obtained metadata, fetching absent metadata from the Metastore and storing resulting metadata into the Metastore.
@@ -109,9 +109,9 @@
 
   private boolean firstLeft = true;
   private boolean firstRight = true;
-  private boolean finished = false;
-  private boolean finishedRight = false;
-  private int recordCount = 0;
+  private boolean finished;
+  private boolean finishedRight;
+  private int recordCount;
 
   protected MetadataControllerBatch(MetadataControllerPOP popConfig,
       FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
@@ -129,13 +129,10 @@
 
   protected boolean setupNewSchema() {
     container.clear();
-
     container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, Types.required(TypeProtos.MinorType.BIT), null);
     container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, Types.required(TypeProtos.MinorType.VARCHAR), null);
-
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     container.setEmpty();
-
     return true;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 11d307b..7a61489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -184,5 +184,4 @@
     logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, recordCount={}, partitionIdVector={}]",
         container, numPartitions, recordCount, partitionIdVector);
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 15962ad..921c92b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.ValueExpressions;
@@ -46,11 +47,12 @@
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.metastore.statistics.Statistic;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- *
  * Example input and output:
- * Schema of incoming batch:
+ * Schema of incoming batch:<pre>
  *    "columns"       : MAP - Column names
  *       "region_id"  : VARCHAR
  *       "sales_city" : VARCHAR
@@ -65,7 +67,7 @@
  *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
  *       "cnt"        : BIGINT - nonnullstatcount(cnt)
  *   .... another map for next stats function ....
- * Schema of outgoing batch:
+ * </pre>Schema of outgoing batch:<pre>
  *    "schema" : BIGINT - Schema number. For each schema change this number is incremented.
  *    "computed" : DATE - What time is it computed?
  *    "columns"       : MAP - Column names
@@ -82,17 +84,19 @@
  *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
  *       "cnt"        : BIGINT - nonnullstatcount(cnt)
  *   .... another map for next stats function ....
+ * </pre>
  */
+
 public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMerge> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatisticsMergeBatch.class);
-  private Map<String, String> functions;
+  private static final Logger logger = LoggerFactory.getLogger(StatisticsMergeBatch.class);
+
+  private final Map<String, String> functions;
   private boolean first = true;
-  private boolean finished = false;
-  private int schema = 0;
-  private int recordCount = 0;
-  private List<String> columnsList = null;
+  private boolean finished;
+  private int schema;
+  private List<String> columnsList;
   private double samplePercent = 100.0;
-  private List<MergedStatistic> mergedStatisticList = null;
+  private final List<MergedStatistic> mergedStatisticList;
 
   public StatisticsMergeBatch(StatisticsMerge popConfig, RecordBatch incoming,
       FragmentContext context) throws OutOfMemoryException {
@@ -115,20 +119,6 @@
   }
 
   /*
-   * Adds the `name` column value vector in the `parent` map vector. These `name` columns are
-   * table columns for which statistics will be computed.
-   */
-  private ValueVector addMapVector(String name, MapVector parent, LogicalExpression expr)
-      throws SchemaChangeException {
-    LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, incoming, context);
-    Class<? extends ValueVector> vvc =
-        TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(),
-            mle.getMajorType().getMode());
-    ValueVector vector = parent.addOrGet(name, mle.getMajorType(), vvc);
-    return vector;
-  }
-
-  /*
    * Identify the list of fields within a map which are generated by StatisticsMerge. Perform
    * basic sanity check i.e. all maps have the same number of columns and those columns are
    * the same in each map
@@ -229,8 +219,7 @@
         }
       }
     }
-    container.setRecordCount(0);
-    recordCount = 0;
+    container.setEmpty();
     container.buildSchema(incoming.getSchema().getSelectionVectorMode());
   }
 
@@ -238,7 +227,7 @@
    * Determines the MajorType based on the incoming value vector. Please look at the
    * comments above the class definition which describes the incoming/outgoing batch schema
    */
-  private void addVectorToOutgoingContainer(String outStatName, VectorWrapper vw)
+  private void addVectorToOutgoingContainer(String outStatName, VectorWrapper<?> vw)
       throws SchemaChangeException {
     // Input map vector
     MapVector inputVector = (MapVector) vw.getValueVector();
@@ -306,9 +295,8 @@
         }
       }
     }
-    ++recordCount;
     // Populate the number of records (1) inside the outgoing batch.
-    container.setRecordCount(1);
+    container.setValueCount(1);
     return IterOutcome.OK;
   }
 
@@ -343,9 +331,7 @@
   }
 
   @Override
-  public void dump() {
-
-  }
+  public void dump() { }
 
   @Override
   public IterOutcome innerNext() {
@@ -404,6 +390,6 @@
 
   @Override
   public int getRecordCount() {
-    return recordCount;
+    return container.getRecordCount();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4471248..a9584bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -25,13 +25,16 @@
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(RemovingRecordBatch.class);
 
   private Copier copier;
 
-  public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+  public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context,
+      RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, incoming);
     logger.debug("Created.");
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 25dae80..ed2b66e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.union;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -68,8 +69,8 @@
 
   private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private UnionAller unionall;
-  private final List<TransferPair> transfers = Lists.newArrayList();
-  private final List<ValueVector> allocationVectors = Lists.newArrayList();
+  private final List<TransferPair> transfers = new ArrayList<>();
+  private final List<ValueVector> allocationVectors = new ArrayList<>();
   private int recordCount;
   private UnionInputIterator unionInputIterator;
 
@@ -341,7 +342,7 @@
   }
 
   private class UnionInputIterator implements Iterator<Pair<IterOutcome, BatchStatusWrappper>> {
-    private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
+    private final Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
 
     UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
       if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 1715c99..85eceea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -295,6 +295,7 @@
       remainderIndex = 0;
       logger.debug("IterOutcome: EMIT.");
     }
+    rowIdVector.getMutator().setValueCount(outputRecords);
     container.setValueCount(outputRecords);
 
     memoryManager.updateOutgoingStats(outputRecords);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 99bd6d1..72a337a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -64,8 +64,8 @@
  *       "sales_city" : BIGINT - nonnullstatcount(sales_city)
  *       "cnt"        : BIGINT - nonnullstatcount(cnt)
  *   .... another map for next stats function ....
- *
- * Schema of output:
+ * </pre>
+ * Schema of output: <pre>
  *  "schema"           : BIGINT - Schema number. For each schema change this number is incremented.
  *  "computed"         : BIGINT - What time is this computed?
  *  "column"           : column name
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 8793a65..e1ffd7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -17,41 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.validate;
 
-import java.util.IdentityHashMap;
-import java.util.Map;
-
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
-import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
-import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
-import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
-import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
-import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
-import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
-import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
-import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch;
-import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
-import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
-import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
-import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataHashAggBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataStreamAggBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataControllerBatch;
-import org.apache.drill.exec.physical.impl.metadata.MetadataHandlerBatch;
-import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
-import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
-import org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
-import org.apache.drill.exec.physical.impl.trace.TraceRecordBatch;
-import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
-import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
-import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
-import org.apache.drill.exec.physical.impl.unpivot.UnpivotMapsRecordBatch;
-import org.apache.drill.exec.physical.impl.window.WindowFrameRecordBatch;
-import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
-import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SimpleVectorWrapper;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -200,89 +166,13 @@
     }
   }
 
-  private enum CheckMode {
-    /** No checking. */
-    NONE,
-    /** Check only batch, container counts. */
-    COUNTS,
-    /** Check vector value counts. */
-    VECTORS
-    };
-
-  private static final Map<Class<? extends CloseableRecordBatch>, CheckMode> checkRules = buildRules();
-
   private final ErrorReporter errorReporter;
 
   public BatchValidator(ErrorReporter errorReporter) {
     this.errorReporter = errorReporter;
   }
 
-  /**
-   * At present, most operators will not pass the checks here. The following
-   * table identifies those that should be checked, and the degree of check.
-   * Over time, this table should include all operators, and thus become
-   * unnecessary.
-   */
-  private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() {
-    Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>();
-    rules.put(OperatorRecordBatch.class, CheckMode.VECTORS);
-    rules.put(ScanBatch.class, CheckMode.VECTORS);
-    rules.put(ProjectRecordBatch.class, CheckMode.VECTORS);
-    rules.put(FilterRecordBatch.class, CheckMode.VECTORS);
-    rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS);
-    rules.put(UnnestRecordBatch.class, CheckMode.VECTORS);
-    rules.put(HashAggBatch.class, CheckMode.VECTORS);
-    rules.put(RemovingRecordBatch.class, CheckMode.VECTORS);
-    rules.put(StreamingAggBatch.class, CheckMode.VECTORS);
-    rules.put(RuntimeFilterRecordBatch.class, CheckMode.VECTORS);
-    rules.put(FlattenRecordBatch.class, CheckMode.VECTORS);
-    rules.put(MergeJoinBatch.class, CheckMode.VECTORS);
-    rules.put(NestedLoopJoinBatch.class, CheckMode.VECTORS);
-    rules.put(LimitRecordBatch.class, CheckMode.VECTORS);
-    rules.put(MergingRecordBatch.class, CheckMode.VECTORS);
-    rules.put(OrderedPartitionRecordBatch.class, CheckMode.VECTORS);
-    rules.put(RangePartitionRecordBatch.class, CheckMode.VECTORS);
-    rules.put(TraceRecordBatch.class, CheckMode.VECTORS);
-    rules.put(UnionAllRecordBatch.class, CheckMode.VECTORS);
-    rules.put(UnorderedReceiverBatch.class, CheckMode.VECTORS);
-    rules.put(UnpivotMapsRecordBatch.class, CheckMode.VECTORS);
-    rules.put(WindowFrameRecordBatch.class, CheckMode.VECTORS);
-    rules.put(TopNBatch.class, CheckMode.VECTORS);
-    rules.put(HashJoinBatch.class, CheckMode.VECTORS);
-    rules.put(ExternalSortBatch.class, CheckMode.VECTORS);
-    rules.put(WriterRecordBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataStreamAggBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataHashAggBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataHandlerBatch.class, CheckMode.VECTORS);
-    rules.put(MetadataControllerBatch.class, CheckMode.VECTORS);
-    return rules;
-  }
-
-  private static CheckMode lookup(Object subject) {
-    CheckMode checkMode = checkRules.get(subject.getClass());
-    return checkMode == null ? CheckMode.NONE : checkMode;
-  }
-
   public static boolean validate(RecordBatch batch) {
-    // This is a handy place to trace batches as they flow up
-    // the DAG. Works best for single-threaded runs with few records.
-    // System.out.println(batch.getClass().getSimpleName());
-    // RowSetFormatter.print(batch);
-
-    CheckMode checkMode = lookup(batch);
-
-    // If no rule, don't check this batch.
-
-    if (checkMode == CheckMode.NONE) {
-
-      // As work proceeds, might want to log those batches not checked.
-      // For now, there are too many.
-
-      return true;
-    }
-
-    // All batches that do any checks will at least check counts.
-
     ErrorReporter reporter = errorReporter(batch);
     int rowCount = batch.getRecordCount();
     int valueCount = rowCount;
@@ -340,9 +230,7 @@
         break;
       }
     }
-    if (checkMode == CheckMode.VECTORS) {
-      new BatchValidator(reporter).validateBatch(batch, valueCount);
-    }
+    new BatchValidator(reporter).validateBatch(batch, valueCount);
     return reporter.errorCount() == 0;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 6ed004f..07a5c76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.window;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -47,7 +48,6 @@
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +64,7 @@
   private List<WindowDataBatch> batches;
 
   private WindowFramer[] framers;
-  private final List<WindowFunction> functions = Lists.newArrayList();
+  private final List<WindowFunction> functions = new ArrayList<>();
 
   private boolean noMoreBatches; // true when downstream returns NONE
   private BatchSchema schema;
@@ -75,7 +75,7 @@
       RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
-    batches = Lists.newArrayList();
+    batches = new ArrayList<>();
   }
 
   /**
@@ -260,17 +260,15 @@
 
     logger.trace("creating framer(s)");
 
-    List<LogicalExpression> keyExprs = Lists.newArrayList();
-    List<LogicalExpression> orderExprs = Lists.newArrayList();
+    List<LogicalExpression> keyExprs = new ArrayList<>();
+    List<LogicalExpression> orderExprs = new ArrayList<>();
     boolean requireFullPartition = false;
 
     boolean useDefaultFrame = false; // at least one window function uses the DefaultFrameTemplate
     boolean useCustomFrame = false; // at least one window function uses the CustomFrameTemplate
 
     // all existing vectors will be transferred to the outgoing container in framer.doWork()
-    for (VectorWrapper<?> wrapper : batch) {
-      container.addOrGet(wrapper.getField());
-    }
+    container.copySchemaFrom(batch);
 
     // add aggregation vectors to the container, and materialize corresponding expressions
     for (NamedExpression ne : popConfig.getAggregations()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index f51f521..03e8ffa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -21,10 +21,8 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
-// TODO javadoc
 public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
   // TODO are these <?> related in any way? Should they be the same one?
-  // TODO javadoc
   VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 1cfc61d..3796e5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -553,4 +553,10 @@
     // in the offset vectors that need it.
     setValueCount(0);
   }
+
+  public void copySchemaFrom(VectorAccessible other) {
+    for (VectorWrapper<?> wrapper : other) {
+      addOrGet(wrapper.getField());
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index da42b27..0ab4181 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -79,8 +79,8 @@
    * @param columns  pathnames of columns/subfields to read
    * @throws OutOfMemoryException
    */
-  public JSONRecordReader(final FragmentContext fragmentContext, final Path inputPath, final DrillFileSystem fileSystem,
-      final List<SchemaPath> columns) throws OutOfMemoryException {
+  public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, DrillFileSystem fileSystem,
+      List<SchemaPath> columns) throws OutOfMemoryException {
     this(fragmentContext, inputPath, null, fileSystem, columns);
   }
 
@@ -137,15 +137,15 @@
   }
 
   @Override
-  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     try{
       if (hadoopPath != null) {
-        this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
+        stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
       }
 
-      this.writer = new VectorContainerWriter(output, unionEnabled);
+      writer = new VectorContainerWriter(output, unionEnabled);
       if (isSkipQuery()) {
-        this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, enableEscapeAnyChar);
+        jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, enableEscapeAnyChar);
       } else {
         this.jsonReader = new JsonReader.Builder(fragmentContext.getManagedBuffer())
             .schemaPathColumns(ImmutableList.copyOf(getColumns()))
@@ -157,7 +157,7 @@
             .build();
       }
       setupParser();
-    } catch (final Exception e){
+    } catch (Exception e){
       handleAndRaise("Failure reading JSON file", e);
     }
   }
@@ -182,7 +182,7 @@
     int columnNr = -1;
 
     if (e instanceof JsonParseException) {
-      final JsonParseException ex = (JsonParseException) e;
+      JsonParseException ex = (JsonParseException) e;
       message = ex.getOriginalMessage();
       columnNr = ex.getLocation().getColumnNr();
     }
@@ -226,7 +226,8 @@
           }
           ++parseErrorCount;
           if (printSkippedMalformedJSONRecordLineNumber) {
-            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount));
+            logger.debug("Error parsing JSON in {}: line: {}",
+                hadoopPath.getName(), recordCount + parseErrorCount);
           }
           if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) {
             break;
@@ -254,8 +255,9 @@
 
   @Override
   public void close() throws Exception {
-    if(stream != null) {
+    if (stream != null) {
       stream.close();
+      stream = null;
     }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 79aa1d3..04bc67d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -81,6 +81,9 @@
 
   @Test
   public void schemaChange() throws Exception {
+    // Verifies that the schema change does not cause a
+    // crash. A pretty minimal test.
+    // TODO: Verify actual results.
     test("select b from dfs.`vector/complex/writer/schemaChange/`");
   }
 
@@ -267,12 +270,15 @@
 
   @Test
   public void testAllTextMode() throws Exception {
-    test("alter system set `store.json.all_text_mode` = true");
-    String[] queries = {"select * from cp.`store/json/schema_change_int_to_string.json`"};
-    long[] rowCounts = {3};
-    String filename = "/store/json/schema_change_int_to_string.json";
-    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
-    test("alter system set `store.json.all_text_mode` = false");
+    try {
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+      String[] queries = {"select * from cp.`store/json/schema_change_int_to_string.json`"};
+      long[] rowCounts = {3};
+      String filename = "/store/json/schema_change_int_to_string.json";
+      runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+    } finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
@@ -293,58 +299,87 @@
 
   @Test
   public void testNullWhereListExpected() throws Exception {
-    test("alter system set `store.json.all_text_mode` = true");
-    String[] queries = {"select * from cp.`store/json/null_where_list_expected.json`"};
-    long[] rowCounts = {3};
-    String filename = "/store/json/null_where_list_expected.json";
-    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
-    test("alter system set `store.json.all_text_mode` = false");
+    try {
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+      String[] queries = {"select * from cp.`store/json/null_where_list_expected.json`"};
+      long[] rowCounts = {3};
+      String filename = "/store/json/null_where_list_expected.json";
+      runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+    }
+    finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
   public void testNullWhereMapExpected() throws Exception {
-    test("alter system set `store.json.all_text_mode` = true");
-    String[] queries = {"select * from cp.`store/json/null_where_map_expected.json`"};
-    long[] rowCounts = {3};
-    String filename = "/store/json/null_where_map_expected.json";
-    runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
-    test("alter system set `store.json.all_text_mode` = false");
+    try {
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true);
+      String[] queries = {"select * from cp.`store/json/null_where_map_expected.json`"};
+      long[] rowCounts = {3};
+      String filename = "/store/json/null_where_map_expected.json";
+      runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts);
+    }
+    finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
   public void ensureProjectionPushdown() throws Exception {
-    // Tests to make sure that we are correctly eliminating schema changing columns.  If completes, means that the projection pushdown was successful.
-    test("alter system set `store.json.all_text_mode` = false; "
-        + "select  t.field_1, t.field_3.inner_1, t.field_3.inner_2, t.field_4.inner_1 "
-        + "from cp.`store/json/schema_change_int_to_string.json` t");
+    try {
+      // Tests to make sure that we are correctly eliminating schema changing
+      // columns. If completes, means that the projection pushdown was
+      // successful.
+      test("alter system set `store.json.all_text_mode` = false; "
+          + "select  t.field_1, t.field_3.inner_1, t.field_3.inner_2, t.field_4.inner_1 "
+          + "from cp.`store/json/schema_change_int_to_string.json` t");
+    } finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
-  // The project pushdown rule is correctly adding the projected columns to the scan, however it is not removing
-  // the redundant project operator after the scan, this tests runs a physical plan generated from one of the tests to
-  // ensure that the project is filtering out the correct data in the scan alone
+  // The project pushdown rule is correctly adding the projected columns to the
+  // scan, however it is not removing the redundant project operator after the
+  // scan, this tests runs a physical plan generated from one of the tests to
+  // ensure that the project is filtering out the correct data in the scan alone.
   @Test
   public void testProjectPushdown() throws Exception {
-    String[] queries = {Files.asCharSource(DrillFileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8).read()};
-    long[] rowCounts = {3};
-    String filename = "/store/json/schema_change_int_to_string.json";
-    test("alter system set `store.json.all_text_mode` = false");
-    runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts);
+    try {
+      String[] queries = {Files.asCharSource(DrillFileUtils.getResourceAsFile(
+          "/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8).read()};
+      String filename = "/store/json/schema_change_int_to_string.json";
+      alterSession(ExecConstants.JSON_ALL_TEXT_MODE, false);
+      long[] rowCounts = {3};
+      runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts);
 
-    List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
-    assertEquals(1, results.size());
-    // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`"
+      List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
+      assertEquals(1, results.size());
+      // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`"
 
-    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-    QueryDataBatch batch = results.get(0);
-    assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+      RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+      QueryDataBatch batch = results.get(0);
+      assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
 
-    // this used to be five.  It is now three.  This is because the plan doesn't have a project.
-    // Scanners are not responsible for projecting non-existent columns (as long as they project one column)
-    assertEquals(3, batchLoader.getSchema().getFieldCount());
-    testExistentColumns(batchLoader);
+      // this used to be five. It is now four. This is because the plan doesn't
+      // have a project. Scanners are not responsible for projecting non-existent
+      // columns (as long as they project one column)
+      //
+      // That said, the JSON format plugin does claim it can do project
+      // push-down, which means it will ensure columns for any column
+      // mentioned in the project list, in a form consistent with the schema
+      // path. In this case, `non_existent`.`nested`.`field` appears in
+      // the query. But, even more oddly, the missing field is inserted only
+      // if all text mode is true, omitted if all text mode is false.
+      // Seems overly complex.
+      assertEquals(3, batchLoader.getSchema().getFieldCount());
+      testExistentColumns(batchLoader);
 
-    batch.release();
-    batchLoader.clear();
+      batch.release();
+      batchLoader.clear();
+    } finally {
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+    }
   }
 
   @Test
@@ -360,32 +395,32 @@
 
   private void testExistentColumns(RecordBatchLoader batchLoader) throws SchemaChangeException {
     VectorWrapper<?> vw = batchLoader.getValueAccessorById(
-        RepeatedBigIntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds() //
+        RepeatedBigIntVector.class,
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds()
     );
     assertEquals("[1]", vw.getValueVector().getAccessor().getObject(0).toString());
     assertEquals("[5]", vw.getValueVector().getAccessor().getObject(1).toString());
     assertEquals("[5,10,15]", vw.getValueVector().getAccessor().getObject(2).toString());
 
     vw = batchLoader.getValueAccessorById(
-        IntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_1")).getFieldIds() //
+        IntVector.class,
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_1")).getFieldIds()
     );
     assertNull(vw.getValueVector().getAccessor().getObject(0));
     assertEquals(2l, vw.getValueVector().getAccessor().getObject(1));
     assertEquals(5l, vw.getValueVector().getAccessor().getObject(2));
 
     vw = batchLoader.getValueAccessorById(
-        IntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_2")).getFieldIds() //
+        IntVector.class,
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_2")).getFieldIds()
     );
     assertNull(vw.getValueVector().getAccessor().getObject(0));
     assertNull(vw.getValueVector().getAccessor().getObject(1));
     assertEquals(3l, vw.getValueVector().getAccessor().getObject(2));
 
     vw = batchLoader.getValueAccessorById(
-        RepeatedBigIntVector.class, //
-        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", "inner_1")).getFieldIds() //
+        RepeatedBigIntVector.class,
+        batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", "inner_1")).getFieldIds()
     );
     assertEquals("[]", vw.getValueVector().getAccessor().getObject(0).toString());
     assertEquals("[1,2,3]", vw.getValueVector().getAccessor().getObject(1).toString());
@@ -440,7 +475,7 @@
                       )
               ).go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -457,7 +492,7 @@
               .baselineValues(13L, "BIGINT")
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -477,7 +512,7 @@
               .baselineValues(3L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -495,7 +530,7 @@
               .baselineValues(9L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -512,7 +547,7 @@
               .baselineValues(11.0)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -536,7 +571,7 @@
               .baselineValues(20000L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -565,7 +600,7 @@
               .baselineValues(20000L)
               .go();
     } finally {
-      testNoResult("alter session set `exec.enable_union_type` = false");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -628,7 +663,7 @@
         .go();
 
     } finally {
-      testNoResult("alter session set `store.json.all_text_mode` = false");
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
     }
   }