DRILL-7724: Refactor metadata controller batch

Also changed for (;;) infinite loops to
while (true) as preferred by IntelliJ.
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 4e7fde9..5f7061e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -146,7 +146,7 @@
     } catch (Throwable e) {
       // Unwrap exception
       Throwable ex = e;
-      for (;;) {
+      while (true) {
         // Case for failing on an invalid cached connection
         if (ex instanceof MetaException ||
             // Case for a timed-out impersonated connection, and
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
index 8905ce3..d388d5c 100644
--- a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java
@@ -42,8 +42,8 @@
  */
 
 public class YarnRMClient {
-  private YarnConfiguration conf;
-  private YarnClient yarnClient;
+  private final YarnConfiguration conf;
+  private final YarnClient yarnClient;
 
   /**
    * Application ID. Semantics are such that each session of Drill-on-YARN works
@@ -126,7 +126,7 @@
     ApplicationReport appReport;
     YarnApplicationState appState;
     ApplicationAttemptId attemptId;
-    for (;;) {
+    while (true) {
       appReport = getAppReport();
       appState = appReport.getYarnApplicationState();
       attemptId = appReport.getCurrentApplicationAttemptId();
@@ -160,7 +160,7 @@
   public void waitForCompletion() throws YarnClientException {
     ApplicationReport appReport;
     YarnApplicationState appState;
-    for (;;) {
+    while (true) {
       appReport = getAppReport();
       appState = appReport.getYarnApplicationState();
       if (appState == YarnApplicationState.FINISHED
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java
index 91956e0..3d074bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java
@@ -32,8 +32,6 @@
 @JsonTypeName("streaming-aggregate")
 public class StreamingAggregate extends AbstractSingle {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregate.class);
-
   private final List<NamedExpression> keys;
   private final List<NamedExpression> exprs;
 
@@ -68,5 +66,4 @@
   public int getOperatorType() {
     return CoreOperatorType.STREAMING_AGGREGATE_VALUE;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 2f0c9e2..cc1494d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -70,7 +70,6 @@
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
-import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -256,10 +255,7 @@
 
     incomingSchema = incoming.getSchema();
     createAggregator();
-    for (VectorWrapper<?> w : container) {
-      AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0);
-    }
-    container.setEmpty();
+    container.allocatePrecomputedChildCount(0, 0, 0);
     if (incoming.getRecordCount() > 0) {
       hashAggMemoryManager.update();
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 7886a5e..b1e332e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -219,7 +219,7 @@
 
           if (first && getKeyExpressions().size() == 0) {
             // if we have a straight aggregate and empty input batch, we need to handle it in a different way
-            // Wewant to produce the special batch only if we got a NONE as the first outcome after
+            // We want to produce the special batch only if we got a NONE as the first outcome after
             // OK_NEW_SCHEMA. If we get a NONE immediately after we see an EMIT, then we have already handled
             // the case of the empty batch
             constructSpecialBatch();
@@ -259,7 +259,7 @@
         && aggregator.previousBatchProcessed()) {
         lastKnownOutcome = incoming.next();
         if (!first ) {
-          //Setup needs to be called again. During setup, generated code saves a reference to the vectors
+          // Setup needs to be called again. During setup, generated code saves a reference to the vectors
           // pointed to by the incoming batch so that the de-referencing of the vector wrappers to get to
           // the vectors  does not have to be done at each call to eval. However, after an EMIT is seen,
           // the vectors are replaced and the reference to the old vectors is no longer valid
@@ -290,7 +290,7 @@
         ExternalSortBatch.releaseBatches(incoming);
         return returnOutcome;
       case RETURN_AND_RESET:
-        //WE could have got a string of batches, all empty, until we hit an emit
+        // We could have got a string of batches, all empty, until we hit an emit
         if (firstBatchForDataSet && getKeyExpressions().size() == 0 && recordCount == 0) {
           // if we have a straight aggregate and empty input batch, we need to handle it in a different way
           constructSpecialBatch();
@@ -371,10 +371,12 @@
   }
 
   /**
-   * Method is invoked when we have a straight aggregate (no group by expression) and our input is empty.
-   * In this case we construct an outgoing batch with record count as 1. For the nullable vectors we don't set anything
-   * as we want the output to be NULL. For the required vectors (only for count()) we set the value to be zero since
-   * we don't zero out our buffers initially while allocating them.
+   * Invoked when we have a straight aggregate (no group by expression) and our
+   * input is empty. In this case we construct an outgoing batch with record
+   * count as 1. For the nullable vectors we don't set anything as we want the
+   * output to be NULL. For the required vectors (only for count()) we set the
+   * value to be zero since we don't zero out our buffers initially while
+   * allocating them.
    */
   private void constructSpecialBatch() {
     int exprIndex = 0;
@@ -426,9 +428,15 @@
 
   protected StreamingAggregator createAggregatorInternal() {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
-    cg.getCodeGenerator().plainJavaCapable(true);
+    // Streaming agg no longer plain Java capable. Stats generates code
+    // that fails when compiled normally.
+    //     cannot override resetValues() in org.apache.drill.exec.physical.impl.aggregate.StreamingAggTemplate
+    //     public boolean resetValues()
+    //     ^
+    //     overridden method does not throw org.apache.drill.exec.exception.SchemaChangeException (compiler.err.override.meth.doesnt.throw)
+    // cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //cg.getCodeGenerator().saveCodeForDebugging(true);
+    // cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new LogicalExpression[getKeyExpressions().size()];
@@ -460,7 +468,7 @@
         continue;
       }
 
-      /* Populate the complex writers for complex exprs */
+      // Populate the complex writers for complex exprs
       if (expr instanceof DrillFuncHolderExpr &&
           ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) {
         // Need to process ComplexWriter function evaluation.
@@ -651,7 +659,8 @@
 
   @Override
   public void dump() {
-    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", container, popConfig, aggregator, incomingSchema);
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]",
+        container, popConfig, aggregator, incomingSchema);
   }
 
   @VisibleForTesting
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4a6822d..c97e19e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -173,15 +173,15 @@
         return AggOutcome.CLEANUP_AND_RETURN;
       }
 
-      outside: while(true) {
+      outside: while (true) {
         // loop through existing records, adding as necessary.
-        if(!processRemainingRecordsInBatch()) {
+        if (!processRemainingRecordsInBatch()) {
           // output batch is full. Return.
           return setOkAndReturn(outerOutcome);
         }
         // if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all
         // the rows in incoming batch
-        if(outerOutcome == EMIT) {
+        if (outerOutcome == EMIT) {
           // output the last record
           outputToBatch(previousIndex);
           resetIndex();
@@ -492,8 +492,7 @@
   }
 
   @Override
-  public void cleanup() {
-  }
+  public void cleanup() { }
 
   @Override
   public String toString() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 1bcb1ca..dce5822 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -36,6 +36,7 @@
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataControllerContext;
 import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils;
 import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -105,18 +106,16 @@
 public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataControllerPOP> {
   private static final Logger logger = LoggerFactory.getLogger(MetadataControllerBatch.class);
 
+  enum State { RIGHT, LEFT, WRITE, FINISHED }
+
   private final Tables tables;
   private final TableInfo tableInfo;
   private final Map<String, MetadataInfo> metadataToHandle;
-  private final StatisticsRecordCollector statisticsCollector;
-  private final List<TableMetadataUnit> metadataUnits;
+  private final StatisticsRecordCollector statisticsCollector = new StatisticsCollectorImpl();
+  private final List<TableMetadataUnit> metadataUnits = new ArrayList<>();
   private final ColumnNamesOptions columnNamesOptions;
 
-  private boolean firstLeft = true;
-  private boolean firstRight = true;
-  private boolean finished;
-  private boolean finishedRight;
-  private int recordCount;
+  private State state = State.RIGHT;
 
   protected MetadataControllerBatch(MetadataControllerPOP popConfig,
       FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
@@ -127,95 +126,59 @@
         ? null
         : popConfig.getContext().metadataToHandle().stream()
             .collect(Collectors.toMap(MetadataInfo::identifier, Function.identity()));
-    this.metadataUnits = new ArrayList<>();
-    this.statisticsCollector = new StatisticsCollectorImpl();
     this.columnNamesOptions = new ColumnNamesOptions(context.getOptions());
   }
 
-  protected boolean setupNewSchema() {
-    container.clear();
-    container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, Types.required(TypeProtos.MinorType.BIT), null);
-    container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, Types.required(TypeProtos.MinorType.VARCHAR), null);
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    container.setEmpty();
-    return true;
-  }
-
   @Override
   public IterOutcome innerNext() {
-    IterOutcome outcome;
-    boolean finishedLeft;
-    if (finished) {
-      return IterOutcome.NONE;
-    }
+    while (state != State.FINISHED) {
+      switch (state) {
+        case RIGHT: {
 
-    if (!finishedRight) {
-      outcome = handleRightIncoming();
-      if (outcome != null) {
-        return outcome;
-      }
-    }
-
-    outer:
-    while (true) {
-      outcome = next(0, left);
-      switch (outcome) {
-        case NONE:
-          // all incoming data was processed when returned OK_NEW_SCHEMA
-          finishedLeft = !firstLeft;
-          break outer;
-        case NOT_YET:
-          return outcome;
-        case OK_NEW_SCHEMA:
-          if (firstLeft) {
-            firstLeft = false;
-            if (!setupNewSchema()) {
-              outcome = IterOutcome.OK;
-            }
-            IterOutcome out = handleLeftIncoming();
-            if (out != IterOutcome.OK) {
-              return out;
-            }
+          // Can only return NOT_YET
+          IterOutcome outcome = handleRightIncoming();
+          if (outcome != null) {
             return outcome;
           }
-          //fall through
-        case OK:
-          assert !firstLeft : "First batch should be OK_NEW_SCHEMA";
-          IterOutcome out = handleLeftIncoming();
-          if (out != IterOutcome.OK) {
-            return out;
+          break;
+        }
+        case LEFT: {
+
+          // Can only return NOT_YET
+          IterOutcome outcome = handleLeftIncoming();
+          if (outcome != null) {
+            return outcome;
           }
           break;
+        }
+        case WRITE:
+          writeToMetastore();
+          createSummary();
+          state = State.FINISHED;
+          return IterOutcome.OK_NEW_SCHEMA;
+
+        case FINISHED:
+          break;
+
         default:
-          throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
+          throw new IllegalStateException(state.name());
       }
     }
-
-    if (finishedLeft) {
-      IterOutcome out = writeToMetastore();
-      finished = true;
-      return out;
-    }
-    return outcome;
+    return IterOutcome.NONE;
   }
 
   private IterOutcome handleRightIncoming() {
-    IterOutcome outcome;
     outer:
     while (true) {
-      outcome = next(0, right);
+      IterOutcome outcome = next(0, right);
       switch (outcome) {
         case NONE:
-          // all incoming data was processed
-          finishedRight = true;
+          state = State.LEFT;
           break outer;
         case NOT_YET:
           return outcome;
         case OK_NEW_SCHEMA:
-          firstRight = false;
-          //fall through
         case OK:
-          assert !firstRight : "First batch should be OK_NEW_SCHEMA";
           appendStatistics(statisticsCollector);
           break;
         default:
@@ -226,14 +189,30 @@
   }
 
   private IterOutcome handleLeftIncoming() {
-    metadataUnits.addAll(getMetadataUnits(left.getContainer()));
-    return IterOutcome.OK;
+    while (true) {
+      IterOutcome outcome = next(0, left);
+      switch (outcome) {
+        case NONE:
+          // all incoming data was processed when returned OK_NEW_SCHEMA
+          state = State.WRITE;
+          return null;
+        case NOT_YET:
+          return outcome;
+        case OK_NEW_SCHEMA:
+        case OK:
+          metadataUnits.addAll(getMetadataUnits(left.getContainer()));
+          break;
+        default:
+          throw new UnsupportedOperationException("Unsupported upstream state " + outcome);
+      }
+    }
   }
 
-  private IterOutcome writeToMetastore() {
-    FilterExpression deleteFilter = popConfig.getContext().tableInfo().toFilter();
+  private void writeToMetastore() {
+    MetadataControllerContext mdContext = popConfig.getContext();
+    FilterExpression deleteFilter = mdContext.tableInfo().toFilter();
 
-    for (MetadataInfo metadataInfo : popConfig.getContext().metadataToRemove()) {
+    for (MetadataInfo metadataInfo : mdContext.metadataToRemove()) {
       deleteFilter = FilterExpression.and(deleteFilter,
           FilterExpression.equal(MetastoreColumn.METADATA_KEY, metadataInfo.key()));
     }
@@ -246,8 +225,7 @@
         .build());
     }
 
-    MetastoreTableInfo metastoreTableInfo = popConfig.getContext().metastoreTableInfo();
-
+    MetastoreTableInfo metastoreTableInfo = mdContext.metastoreTableInfo();
     if (tables.basicRequests().hasMetastoreTableInfoChanged(metastoreTableInfo)) {
       throw UserException.executionError(null)
         .message("Metadata for table [%s] was changed before analyze is finished", tableInfo.name())
@@ -256,7 +234,10 @@
 
     modify.overwrite(metadataUnits)
         .execute();
+  }
 
+  private void createSummary() {
+    container.clear();
     BitVector bitVector =
         container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, Types.required(TypeProtos.MinorType.BIT), null);
     VarCharVector varCharVector =
@@ -272,11 +253,8 @@
             popConfig.getContext().tableInfo().workspace(),
             popConfig.getContext().tableInfo().name()).getBytes());
 
-    bitVector.getMutator().setValueCount(1);
-    varCharVector.getMutator().setValueCount(1);
-    container.setRecordCount(++recordCount);
-
-    return IterOutcome.OK;
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.setValueCount(1);
   }
 
   private List<TableMetadataUnit> getMetadataUnits(VectorContainer container) {
@@ -696,7 +674,6 @@
       default:
         return objectReader.getObject();
     }
-
   }
 
   private Set<Path> getIncomingLocations(TupleReader reader) {
@@ -748,6 +725,6 @@
 
   @Override
   public int getRecordCount() {
-    return recordCount;
+    return container.getRecordCount();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index 5081b27..b6c77ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -246,7 +246,7 @@
   }
 
   private void nextAction(boolean readSchema) {
-    for (;;) {
+    while (true) {
 
       // If have a reader, read a batch
       if (readerState != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 42ea36c..915c1f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -396,7 +396,7 @@
     // Loop over all input batches
 
     IterOutcome result = OK;
-    loop: for (;;) {
+    loop: while (true) {
       result = loadBatch();
       switch (result) {
       case NONE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java
index e037600..cc8f6bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java
@@ -539,7 +539,7 @@
     // a single last pass.
 
     loop:
-    for (;;) {
+    while (true) {
       MergeTask task = memManager.consolidateBatches(
           allocator.getAllocatedMemory(),
           bufferedBatches.size(),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 97c60f3..00ab539 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -48,35 +48,37 @@
 public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
 
   public enum OperatorPhase {
-    PHASE_1of1(false, false, false, "Single"),
-    PHASE_1of2(true, true, false, "1st"),
-    PHASE_2of2(true, false, true, "2nd");
 
-    private boolean hasTwo;
-    private boolean is1st;
-    private boolean is2nd;
+    // Single phase aggregate
+    PHASE_1of1("Single"),
+
+    // Distributed aggregate: partitioned first phase
+    PHASE_1of2("1st"),
+
+    // Distibuted aggregate: non-partitioned overall aggregation
+    // phase
+    PHASE_2of2("2nd");
+
     private String name;
 
-    OperatorPhase(boolean hasTwo,
-                  boolean is1st,
-                  boolean is2nd,
-                  String name) {
-      this.hasTwo = hasTwo;
-      this.is1st = is1st;
-      this.is2nd = is2nd;
+    OperatorPhase(String name) {
       this.name = name;
     }
 
     public boolean hasTwo() {
-      return hasTwo;
+      return this != PHASE_1of1;
     }
 
     public boolean is1st() {
-      return is1st;
+      return this == PHASE_1of2;
     }
 
     public boolean is2nd() {
-      return is2nd;
+      return this == PHASE_2of2;
+    }
+
+    public boolean isFinal() {
+      return this != PHASE_1of2;
     }
 
     public String getName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 4ec0b8d..17d7b53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -30,6 +30,7 @@
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
@@ -573,4 +574,13 @@
       add(TypeHelper.getNewVector(field, allocator));
     }
   }
+
+  public void allocatePrecomputedChildCount(int valueCount,
+      int bytesPerValue, int childValCount) {
+    for (VectorWrapper<?> w : wrappers) {
+      AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(),
+          valueCount, bytesPerValue, childValCount);
+    }
+    setEmpty();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java
index 963b8c5..d6f5912 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java
@@ -54,7 +54,7 @@
   @Override
   public void parse(TokenIterator tokenizer) {
     arrayListener.onStart();
-    top: for (;;) {
+    top: while (true) {
       // Position: [ (value, )* ^ ?
       JsonToken token = tokenizer.requireNext();
       switch (token) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
index 7d5131b..d705f00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java
@@ -55,7 +55,7 @@
   public void parseTail(TokenIterator tokenizer) {
 
     // Parse (field: value)* }
-    for (;;) {
+    while (true) {
       JsonToken token = tokenizer.requireNext();
       switch (token) {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index 23ff3e1..4d46a46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -240,7 +240,7 @@
       // Only occurs for an empty document
       return false;
     }
-    for (;;) {
+    while (true) {
       try {
         return rootState.parseRoot(tokenizer);
       } catch (RecoverableJsonException e) {
@@ -264,8 +264,8 @@
   private boolean recover() {
     logger.warn("Attempting recovery from JSON syntax error. " + tokenizer.context());
     boolean firstAttempt = true;
-    for (;;) {
-      for (;;) {
+    while (true) {
+      while (true) {
         try {
           if (parser.isClosed()) {
             throw errorFactory().unrecoverableError();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java
index 15fc128..8819e2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java
@@ -74,7 +74,7 @@
     // Accept value* ]
 
     boolean first = true;
-    for (;;) {
+    while (true) {
       JsonToken token = tokenizer.requireNext();
       if (token == JsonToken.END_ARRAY) {
         json.append(tokenizer.textValue());
@@ -93,7 +93,7 @@
     // Accept (field: value)* }
 
     boolean first = true;
-    for (;;) {
+    while (true) {
       JsonToken token = tokenizer.requireNext();
       if (token == JsonToken.END_OBJECT) {
         json.append(tokenizer.textValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java
index 226cc29..ce331a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java
@@ -101,8 +101,7 @@
     listener.onStart();
 
     // Parse (field: value)* }
-
-    top: for (;;) {
+    top: while (true) {
       JsonToken token = tokenizer.requireNext();
       // Position: { (key: value)* ? ^
       switch (token) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
index a5181f1..b93f29e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java
@@ -87,7 +87,7 @@
   }
 
   private boolean parseToElement(TokenIterator tokenizer, int level) throws MessageContextException {
-    for (;;) {
+    while (true) {
       JsonToken token = tokenizer.requireNext();
       switch (token) {
         case FIELD_NAME:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
index 58d8416..7f2e060 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
@@ -77,7 +77,7 @@
      * as the user changes system options. This value captures the value
      * calculated at the time that this lease was granted.
      */
-    private long queryMemory;
+    private final long queryMemory;
 
     public DistributedQueueLease(QueryId queryId, String queueName,
                     DistributedLease lease, long queryMemory) {
@@ -197,9 +197,9 @@
   }
 
   private long memoryPerNode;
-  private SystemOptionManager optionManager;
+  private final SystemOptionManager optionManager;
   private ConfigSet configSet;
-  private ClusterCoordinator clusterCoordinator;
+  private final ClusterCoordinator clusterCoordinator;
   private long nextRefreshTime;
   private long memoryPerSmallQuery;
   private long memoryPerLargeQuery;
@@ -335,7 +335,7 @@
 
   private void release(QueueLease lease) {
     DistributedQueueLease theLease = (DistributedQueueLease) lease;
-    for (;;) {
+    while (true) {
       try {
         theLease.lease.close();
         theLease.lease = null;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
index 646c3f4..13780de 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
@@ -412,7 +412,7 @@
     // Equivalent of an entire operator run
 
     int start = 1;
-    for (;;) {
+    while (true) {
 
       // Equivalent of operator next() method
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
index 975811e..da4705b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java
@@ -1220,7 +1220,7 @@
       // away the last row because the row set abstraction does not
       // implement vector overflow other than throwing an exception.
 
-      for (;;) {
+      while (true) {
         writer.scalar(0).setInt(count);
         writer.scalar(1).setString(varCharValue);
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java
index 1e2617b..5189db8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java
@@ -32,7 +32,7 @@
     builder.configBuilder().put(ExecConstants.HTTP_ENABLE, true);
     try {
       startCluster(builder);
-      for (;;) {
+      while (true) {
         Thread.sleep(1000);
       }
     } catch (Exception e) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index bbf684e..442fd8e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -94,7 +94,7 @@
     }
   }
 
-  private ClusterFixture cluster;
+  private final ClusterFixture cluster;
   private DrillClient client;
 
   public ClientFixture(ClientBuilder builder) throws RpcException {
@@ -326,7 +326,7 @@
     public String parseNext() throws IOException {
       boolean eof = false;
       StringBuilder buf = new StringBuilder();
-      for (;;) {
+      while (true) {
         int c = in.read();
         if (c == -1) {
           eof = true;
@@ -339,7 +339,7 @@
         if (c == '"' || c == '\'' || c == '`') {
           int quote = c;
           boolean escape = false;
-          for (;;) {
+          while (true) {
             c = in.read();
             if (c == -1) {
               throw new IllegalArgumentException("Mismatched quote: " + (char) c);
@@ -363,7 +363,7 @@
   public int exec(Reader in) throws IOException {
     StatementParser parser = new StatementParser(in);
     int count = 0;
-    for (;;) {
+    while (true) {
       String stmt = parser.parseNext();
       if (stmt == null) {
         logger.debug("----");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 6b0641d..4828f6c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -709,8 +709,7 @@
     int batchCount = 0;
     QueryId queryId = null;
     QueryState state;
-    loop:
-    for (;;) {
+    loop: while (true) {
       QueryEvent event = listener.get();
       switch (event.type)
       {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
index f3c436d..3a3fe67 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
@@ -32,7 +32,7 @@
  */
 
 public class QueryResultSet {
-  private BufferingQueryEventListener listener;
+  private final BufferingQueryEventListener listener;
   private boolean eof;
   private int recordCount = 0;
   private int batchCount = 0;
@@ -58,7 +58,7 @@
     if (eof) {
       return null;
     }
-    for (;;) {
+    while (true) {
       QueryEvent event = listener.get();
       switch (event.type)
       {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
index c8d8459..acc5056 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
@@ -52,7 +52,7 @@
 
   @Override
   public boolean hasNext() {
-    for (;;) {
+    while (true) {
       QueryEvent event = listener.get();
       state = event.state;
       batch = null;