DRILL-7675: Work around for partitions sender memory use

Adds an ad-hoc system/session option to limit partition sender
memory use. See DRILL-7686 for the underlying issue.

Also includes code cleanup and diagnostic tools.

closes #2047
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c421c73..3a16353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -162,7 +162,6 @@
   public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null);
 
 
-
   // Hash Aggregate Options
   public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
   public static final LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128,
@@ -189,6 +188,12 @@
   public static final BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY,
       new OptionDescription("Hash Aggregates ignore memory limits when enabled (true). When disabled (false), Hash Aggregates fail when memory is set too low."));
 
+  // Partitioner options
+  public static final String PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY = "exec.partition.mem_throttle";
+  public static final LongValidator PARTITIONER_MEMORY_REDUCTION_THRESHOLD_VALIDATOR =
+      new RangeLongValidator(PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY, 0, Integer.MAX_VALUE,
+      new OptionDescription("Linearly reduces partition sender buffer row count after this number of receivers. Default is 0 (disabled). (Since Drill 1.18)"));
+
   public static final String SSL_PROVIDER = "drill.exec.ssl.provider"; // valid values are "JDK", "OPENSSL" // default JDK
   public static final String SSL_PROTOCOL = "drill.exec.ssl.protocol"; // valid values are SSL, SSLV2, SSLV3, TLS, TLSV1, TLSv1.1, TLSv1.2(default)
   public static final String SSL_KEYSTORE_TYPE = "drill.exec.ssl.keyStoreType";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
index b4790c8..2c580da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassBuilder.java
@@ -29,6 +29,8 @@
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.server.options.OptionSet;
 import org.codehaus.commons.compiler.CompileException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements the "plain Java" method of code generation and
@@ -76,10 +78,9 @@
  * The setting to prefer plain Java is ignored for any remaining generated
  * classes not marked as plain Java capable.
  */
-
 public class ClassBuilder {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassBuilder.class);
+  private static final Logger logger = LoggerFactory.getLogger(ClassBuilder.class);
   public static final String CODE_DIR_OPTION = CodeCompiler.COMPILE_BASE + ".code_dir";
 
   private final DrillConfig config;
@@ -95,7 +96,6 @@
     // point your debugger to the directory set below, and you
     // can step into the code for debugging. Code is not saved
     // be default because doing so is expensive and unnecessary.
-
     codeDir = new File(config.getString(CODE_DIR_OPTION));
   }
 
@@ -109,7 +109,6 @@
    * @return the class that the code generator defines
    * @throws ClassTransformationException
    */
-
   public Class<?> getImplementationClass(CodeGenerator<?> cg) throws ClassTransformationException {
     try {
       return compileClass(cg);
@@ -133,23 +132,19 @@
     final long t1 = System.nanoTime();
 
     // Get the plain Java code.
-
     String code = cg.getGeneratedCode();
 
     // Get the class names (dotted, file path, etc.)
-
     String className = cg.getMaterializedClassName();
     ClassTransformer.ClassNames name = new ClassTransformer.ClassNames(className);
 
     // A key advantage of this method is that the code can be
     // saved and debugged, if needed.
-
     if (cg.isCodeToBeSaved()) {
       saveCode(code, name);
     }
 
     // Compile the code and load it into a class loader.
-
     CachedClassLoader classLoader = new CachedClassLoader();
     ClassCompilerSelector compilerSelector = new ClassCompilerSelector(classLoader, config, options);
     Map<String,byte[]> results = compilerSelector.compile(name, code);
@@ -165,7 +160,6 @@
                   (System.nanoTime() - t1 + 500_000) / 1_000_000);
 
     // Get the class from the class loader.
-
     try {
       return classLoader.findClass(className);
     } catch (ClassNotFoundException e) {
@@ -184,7 +178,6 @@
    * @param code the source code
    * @param name the class name
    */
-
   private void saveCode(String code, ClassNames name) {
 
     String pathName = name.slash + ".java";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 843d20b..00d5bae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -945,7 +945,7 @@
       // everything in memory
       String message = String.format(
           "When using the minimum number of partitions %d we require %s memory but only have %s available. "
-              + "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
+              + "Forcing legacy behavior of using unbounded memory in order to prevent regressions.",
           numPartitions,
           FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
           FileUtils.byteCountToDisplaySize(allocator.getLimit()));
@@ -984,7 +984,7 @@
     // is enabled
     if (reason == null) {
       boolean fallbackEnabled = context.getOptions()
-          .getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+          .getBoolean(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY);
       if (fallbackEnabled) {
         logger.warn(
             "Spilling is disabled - not enough memory available for internal partitioning. Falling back"
@@ -992,8 +992,9 @@
       } else {
         throw UserException.resourceError().message(String.format(
             "Not enough memory for internal partitioning and fallback mechanism for "
-                + "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter "
-                + "session/system command or increase memory limit for Drillbit",
+                + "HashJoin to use unbounded memory is disabled.\n" +
+                "Either enable fallback option %s using ALTER "
+                + "SESSION/SYSTEM command or increase the memory limit for the Drillbit",
             ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
       }
     } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 76c60e8..4f22c5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -48,8 +48,9 @@
   void initialize();
   void clear();
   List<? extends PartitionOutgoingBatch> getOutgoingBatches();
+
   /**
-   * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner
+   * Get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner
    * @param index
    * @return PartitionOutgoingBatch that matches index within Partitioner. This method can
    * return null if index does not fall within boundary of this Partitioner
@@ -58,4 +59,4 @@
   OperatorStats getStats();
 
   TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index a3149b8..95e62e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -24,10 +24,11 @@
 import javax.inject.Named;
 
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.ExchangeFragmentContext;
@@ -50,7 +51,6 @@
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,9 +104,44 @@
     this.end = end;
     doSetup(context, incoming, null);
 
-    // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory
-    // allocated.
-    if (popConfig.getDestinations().size() > 1000) {
+    // Consider the system/session option to allow the buffer size to shrink
+    // linearly with the increase in slice count, over some limit:
+    // exec.partition.mem_throttle:
+    // The default is 0, which leaves the current logic unchanged.
+    // If set to a positive value, then when the slice count exceeds that
+    // amount, the buffer size per sender is reduced.
+    // The reduction factor is 1 / (slice count - threshold), with a minimum
+    // batch size of 256 records.
+    //
+    // So, if we set the threshold at 2, and run 10 slices, each slice will
+    // get 1024 / 8 = 256 records.
+    //
+    // This option controls memory, but at an obvious cost of increasing overhead.
+    // One could argue that this is a good thing. As the number of senders
+    // increases, the number of records going to each sender decreases, which
+    // increases the time that batches must accumulate before they are sent.
+    //
+    // If the option is enabled, and buffer size reduction kicks in, you'll
+    // find an info-level log message which details the reduction:
+    // exec.partition.mem_throttle is set to 2: 10 receivers,
+    //       reduced send buffer size from 1024 to 256 rows
+    //
+    // See  DRILL-7675, DRILL-7686.
+    int destinationCount = popConfig.getDestinations().size();
+    int reductionCutoff = oContext.getFragmentContext().getOptions().getInt(
+        ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY);
+    if (reductionCutoff > 0 && destinationCount >= reductionCutoff) {
+      int reducedBatchSize = Math.max(256,
+          (DEFAULT_RECORD_BATCH_SIZE + 1) / (destinationCount - reductionCutoff));
+      outgoingRecordBatchSize = BaseAllocator.nextPowerOfTwo(reducedBatchSize) - 1;
+      logger.info("{} is set to {}: {} receivers, reduced send buffer size from {} to {} rows",
+          ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_KEY,
+          reductionCutoff, destinationCount,
+          DEFAULT_RECORD_BATCH_SIZE, outgoingRecordBatchSize);
+    } else if (destinationCount > 1000) {
+      // Half the outgoing record batch size if the number of senders exceeds 1000 to reduce the total amount of memory
+      // allocated.
+
       // Always keep the recordCount as (2^x) - 1 to better utilize the memory allocation in ValueVectors
       outgoingRecordBatchSize = (DEFAULT_RECORD_BATCH_SIZE + 1)/2 - 1;
     }
@@ -114,7 +149,7 @@
     int fieldId = 0;
     for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
       // create outgoingBatches only for subset of Destination Points
-      if ( fieldId >= start && fieldId < end ) {
+      if (fieldId >= start && fieldId < end) {
         logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId);
         outgoingBatches.add(newOutgoingRecordBatch(stats, popConfig,
           context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId()));
@@ -149,7 +184,6 @@
    * generated inner class. Byte-code manipulation appears to fix up the byte codes
    * directly. The name is special, it must be "new" + inner class name.
    */
-
   protected OutgoingRecordBatch newOutgoingRecordBatch(
                                OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
                                FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
@@ -259,24 +293,23 @@
     private final AccountingDataTunnel tunnel;
     private final HashPartitionSender operator;
     private final FragmentContext context;
-    private final BufferAllocator allocator;
-    private final VectorContainer vectorContainer = new VectorContainer();
+    private final VectorContainer vectorContainer;
     private final int oppositeMinorFragmentId;
     private final OperatorStats stats;
 
-    private boolean isLast = false;
-    private boolean dropAll = false;
+    private boolean isLast;
+    private boolean dropAll;
     private int recordCount;
     private int totalRecords;
 
     public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
                                FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
       this.context = context;
-      this.allocator = allocator;
       this.operator = operator;
       this.tunnel = tunnel;
       this.stats = stats;
       this.oppositeMinorFragmentId = oppositeMinorFragmentId;
+      this.vectorContainer = new VectorContainer(allocator);
     }
 
     protected void copy(int inIndex) throws IOException {
@@ -376,9 +409,7 @@
     }
 
     private void allocateOutgoingRecordBatch() {
-      for (VectorWrapper<?> v : vectorContainer) {
-        v.getValueVector().allocateNew();
-      }
+      vectorContainer.allocate(outgoingRecordBatchSize);
     }
 
     public void updateStats(FragmentWritableBatch writableBatch) {
@@ -391,12 +422,7 @@
      * Initialize the OutgoingBatch based on the current schema in incoming RecordBatch
      */
     public void initializeBatch() {
-      for (VectorWrapper<?> v : incoming) {
-        // create new vector
-        ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator);
-        outgoingVector.setInitialCapacity(outgoingRecordBatchSize);
-        vectorContainer.add(outgoingVector);
-      }
+      vectorContainer.buildFrom(incoming.getSchema());
       allocateOutgoingRecordBatch();
       try {
         doSetup(incoming, vectorContainer);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 61850b7..6d8865d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -123,7 +123,6 @@
    * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} method instead since
    * {@link MaterializedField#isEquivalent(MaterializedField)} method is updated to remove the reference check.
    */
-
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
@@ -145,7 +144,6 @@
 
     // Compare names.
     // (DRILL-5525: actually compares all fields.)
-
     if (!fields.equals(other.fields)) {
       return false;
     }
@@ -153,7 +151,6 @@
     // Compare types
     // (DRILL-5525: this code is redundant because any differences
     // will fail above.)
-
     for (int i = 0; i < fields.size(); i++) {
       MajorType t1 = fields.get(i).getType();
       MajorType t2 = other.fields.get(i).getType();
@@ -181,7 +178,6 @@
    * the {@link MaterializedField#isEquivalent(MaterializedField)} rules,
    * false otherwise
    */
-
   public boolean isEquivalent(BatchSchema other) {
     if (this == other) {
       return true;
@@ -209,17 +205,15 @@
   private boolean majorTypeEqual(MajorType t1, MajorType t2) {
     if (t1.equals(t2)) {
       return true;
-    }
-    if (!t1.getMinorType().equals(t2.getMinorType())) {
+    } else if (!t1.getMinorType().equals(t2.getMinorType())) {
       return false;
-    }
-    if (!t1.getMode().equals(t2.getMode())) {
+    } else if (!t1.getMode().equals(t2.getMode())) {
       return false;
-    }
-    if (!Sets.newHashSet(t1.getSubTypeList()).equals(Sets.newHashSet(t2.getSubTypeList()))) {
+    } else if (!Sets.newHashSet(t1.getSubTypeList()).equals(Sets.newHashSet(t2.getSubTypeList()))) {
       return false;
+    } else {
+      return true;
     }
-    return true;
   }
 
   /**
@@ -237,7 +231,6 @@
    * @param otherSchema the schema to merge with this one
    * @return the new, merged, schema
    */
-
   public BatchSchema merge(BatchSchema otherSchema) {
     if (selectionVectorMode != SelectionVectorMode.NONE ||
         otherSchema.selectionVectorMode != SelectionVectorMode.NONE) {
@@ -249,4 +242,17 @@
     mergedFields.addAll(otherSchema.fields);
     return new BatchSchema(selectionVectorMode, mergedFields);
   }
+
+  /**
+   * Format the schema into a multi-line format. Useful when debugging a query with
+   * a very wide schema as the usual single-line format is far too hard to read.
+   */
+  public String format() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("Batch Schema:\n");
+    for (MaterializedField field : fields) {
+      field.format(buf, 1);
+    }
+    return buf.toString();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 3796e5a..4ec0b8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -440,6 +440,14 @@
     return wrappers.size();
   }
 
+  public void allocate(int recordCount) {
+    for (VectorWrapper<?> w : wrappers) {
+      ValueVector v = w.getValueVector();
+      v.setInitialCapacity(recordCount);
+      v.allocateNew();
+    }
+  }
+
   public void allocateNew() {
     for (VectorWrapper<?> w : wrappers) {
       w.getValueVector().allocateNew();
@@ -559,4 +567,10 @@
       addOrGet(wrapper.getField());
     }
   }
+
+  public void buildFrom(BatchSchema sourceSchema) {
+    for (MaterializedField field : sourceSchema) {
+      add(TypeHelper.getNewVector(field, allocator));
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
index a502e07..ee786c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java
@@ -69,6 +69,11 @@
   }
 
   @Override
+  public int getInt(String name) {
+    return (int) getLong(name);
+  }
+
+  @Override
   public long getLong(String name) {
     return getByType(name, Kind.LONG).num_val;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java
index e96d571..7e85a76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java
@@ -83,10 +83,19 @@
    * @throws IllegalArgumentException if the option is undefined or
    * is not of the correct data type
    */
-
   boolean getBoolean(String name);
 
   /**
+   * Return the value of a long option as an int
+   *
+   * @param name option name
+   * @return the long value
+   * @throws IllegalArgumentException if the option is undefined or
+   * is not of the correct data type
+   */
+  int getInt(String name);
+
+  /**
    * Return the value of a long option.
    *
    * @param name option name
@@ -94,7 +103,6 @@
    * @throws IllegalArgumentException if the option is undefined or
    * is not of the correct data type
    */
-
   long getLong(String name);
 
   /**
@@ -105,7 +113,6 @@
    * @throws IllegalArgumentException if the option is undefined or
    * is not of the correct data type
    */
-
   double getDouble(String name);
 
   /**
@@ -116,6 +123,5 @@
    * @throws IllegalArgumentException if the option is undefined or
    * is not of the correct data type
    */
-
   String getString(String name);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 5d2598c..3eb643c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -188,6 +188,7 @@
       new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.PARTITIONER_MEMORY_REDUCTION_THRESHOLD_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
       new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
index 28a98aa..f24ea09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
@@ -28,15 +28,18 @@
 import com.sun.codemodel.JVar;
 
 public class CopyUtil {
-  public static void generateCopies(ClassGenerator<?> g, VectorAccessible batch, boolean hyper){
+
+  public static void generateCopies(ClassGenerator<?> g, VectorAccessible batch, boolean hyper) {
     // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
     int fieldId = 0;
 
     JExpression inIndex = JExpr.direct("inIndex");
     JExpression outIndex = JExpr.direct("outIndex");
-    for(VectorWrapper<?> vv : batch) {
+    for (VectorWrapper<?> vv : batch) {
       String copyMethod;
-      if (!Types.isFixedWidthType(vv.getField().getType()) || Types.isRepeated(vv.getField().getType()) || Types.isComplex(vv.getField().getType())) {
+      if (!Types.isFixedWidthType(vv.getField().getType()) ||
+           Types.isRepeated(vv.getField().getType()) ||
+           Types.isComplex(vv.getField().getType())) {
         copyMethod = "copyFromSafe";
       } else {
         copyMethod = "copyFrom";
@@ -53,19 +56,13 @@
           .build();
       JVar outVV = g.declareVectorValueSetupAndMember("outgoing", outFieldId);
 
-      if(hyper){
-
-        g.getEvalBlock().add(
-                outVV
+      if (hyper) {
+        g.getEvalBlock().add(outVV
                         .invoke(copyMethod)
-                        .arg(
-                                inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+                        .arg(inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
                         .arg(outIndex)
-                        .arg(
-                                inVV.component(inIndex.shrz(JExpr.lit(16)))
-                        )
-        );
-      }else{
+                        .arg(inVV.component(inIndex.shrz(JExpr.lit(16)))));
+      } else {
         g.getEvalBlock().add(outVV.invoke(copyMethod).arg(inIndex).arg(outIndex).arg(inVV));
       }
 
@@ -73,5 +70,4 @@
       fieldId++;
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 962d74e..1cecbd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -55,7 +55,7 @@
 import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.FailureUtils;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.Pointer;
@@ -65,6 +65,8 @@
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Date;
@@ -93,8 +95,8 @@
  */
 
 public class Foreman implements Runnable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
-  private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
+  private static final Logger logger = LoggerFactory.getLogger(Foreman.class);
+  private static final Logger queryLogger = LoggerFactory.getLogger("query.logger");
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class);
 
   public enum ProfileOption { SYNC, ASYNC, NONE }
@@ -108,8 +110,7 @@
   private final QueryManager queryManager; // handles lower-level details of query execution
   private final DrillbitContext drillbitContext;
   private final UserClientConnection initiatingClient; // used to send responses
-  private boolean resume = false;
-  private final ProfileOption profileOption;
+  private boolean resume;
 
   private final QueryResourceManager queryRM;
 
@@ -122,7 +123,7 @@
   private String queryText;
 
   private RuntimeFilterRouter runtimeFilterRouter;
-  private boolean enableRuntimeFilter;
+  private final boolean enableRuntimeFilter;
 
   /**
    * Constructor. Sets up the Foreman, but does not initiate any execution.
@@ -154,11 +155,9 @@
     this.queryRM = drillbitContext.getResourceManager().newQueryRM(this);
     this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
     this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
-    this.profileOption = setProfileOption(queryContext.getOptions());
     this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
   }
 
-
   /**
    * @return query id
    */
@@ -326,11 +325,15 @@
     }
   }
 
-  private ProfileOption setProfileOption(OptionManager options) {
-    if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) {
+  private ProfileOption getProfileOption(QueryContext queryContext) {
+    if (queryContext.isSkipProfileWrite()) {
       return ProfileOption.NONE;
     }
-    if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) {
+    OptionSet options = queryContext.getOptions();
+    if (!options.getBoolean(ExecConstants.ENABLE_QUERY_PROFILE_OPTION)) {
+      return ProfileOption.NONE;
+    }
+    if (options.getBoolean(ExecConstants.QUERY_PROFILE_DEBUG_OPTION)) {
       return ProfileOption.SYNC;
     } else {
       return ProfileOption.ASYNC;
@@ -792,8 +795,8 @@
 
       // Debug option: write query profile before sending final results so that
       // the client can be certain the profile exists.
-      final boolean skipProfileWrite = queryContext.isSkipProfileWrite();
-      if (profileOption == ProfileOption.SYNC && !skipProfileWrite) {
+      ProfileOption profileOption = getProfileOption(queryContext);
+      if (profileOption == ProfileOption.SYNC) {
         queryManager.writeFinalProfile(uex);
       }
 
@@ -823,7 +826,7 @@
       // storage write; query completion occurs in parallel with profile
       // persistence.
 
-      if (profileOption == ProfileOption.ASYNC && !skipProfileWrite) {
+      if (profileOption == ProfileOption.ASYNC) {
         queryManager.writeFinalProfile(uex);
       }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 0c140a4..66739de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -58,12 +58,15 @@
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Each Foreman holds its own QueryManager.  This manages the events associated with execution of a particular query across all fragments.
+ * Each Foreman holds its own QueryManager. This manages the events associated
+ * with execution of a particular query across all fragments.
  */
 public class QueryManager implements AutoCloseable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(QueryManager.class);
 
   private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
   private final QueryId queryId;
@@ -240,10 +243,10 @@
   public void close() throws Exception { }
 
   /*
-     * This assumes that the FragmentStatusListener implementation takes action when it hears
-     * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
-     * but log messages.
-     */
+   * This assumes that the FragmentStatusListener implementation takes action when it hears
+   * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
+   * but log messages.
+   */
   private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
     /**
      * An enum of possible signals that {@link SignalListener} listens to.
@@ -461,13 +464,10 @@
    * there is a node failure, we can then correctly track how many outstanding messages will never arrive.
    */
   private class NodeTracker {
-    private final DrillbitEndpoint endpoint;
     private final AtomicInteger totalFragments = new AtomicInteger(0);
     private final AtomicInteger completedFragments = new AtomicInteger(0);
 
-    public NodeTracker(final DrillbitEndpoint endpoint) {
-      this.endpoint = endpoint;
-    }
+    public NodeTracker(final DrillbitEndpoint endpoint) { }
 
     /**
      * Increments the number of fragment this node is running.
@@ -506,7 +506,6 @@
       }
       return true;
     }
-
   }
 
   /**
@@ -556,7 +555,6 @@
     }
   };
 
-
   public DrillbitStatusListener getDrillbitStatusListener() {
     return drillbitStatusListener;
   }
@@ -601,7 +599,6 @@
             new ForemanException(String.format("One more more nodes lost connectivity during query.  Identified nodes were [%s].",
                 failedNodeList)));
       }
-
     }
   };
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 9e306e6..46ae53a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -541,6 +541,7 @@
     exec.java_compiler_janino_maxsize: 262144,
     exec.max_hash_table_size: 1073741824,
     exec.min_hash_table_size: 65536,
+    exec.partition.mem_throttle: 0,
     exec.persistent_table.umask: "002",
     exec.query.progress.update: true,
     exec.query_profile.debug_mode: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 80e2a26..c8b64a4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -80,7 +80,6 @@
     {
       // Properties here mimic those in drill-root/pom.xml, Surefire plugin
       // configuration. They allow tests to run successfully in Eclipse.
-
       put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false);
 
       // The CTTAS function requires that the default temporary workspace be
@@ -88,36 +87,30 @@
       // dfs.tmp. But, the test setup marks dfs.tmp as read-only. To work
       // around this, tests are supposed to use dfs. So, we need to
       // set the default temporary workspace to dfs.tmp.
-
       put(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE, DFS_TMP_SCHEMA);
       put(ExecConstants.HTTP_ENABLE, false);
       put("drill.catastrophic_to_standard_out", true);
 
       // Verbose errors.
-
       put(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY, true);
 
       // See Drillbit.close. The Drillbit normally waits a specified amount
       // of time for ZK registration to drop. But, embedded Drillbits normally
       // don't use ZK, so no need to wait.
-
       put(ExecConstants.ZK_REFRESH, 0);
 
       // This is just a test, no need to be heavy-duty on threads.
       // This is the number of server and client RPC threads. The
       // production default is DEFAULT_SERVER_RPC_THREADS.
-
       put(ExecConstants.BIT_SERVER_RPC_THREADS, 2);
 
       // No need for many scanners except when explicitly testing that
       // behavior. Production default is DEFAULT_SCAN_THREADS
-
       put(ExecConstants.SCAN_THREADPOOL_SIZE, 4);
 
       // Define a useful root location for the ZK persistent
       // storage. Profiles will go here when running in distributed
       // mode.
-
       put(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT,
           "/tmp/drill/tests");
     }
@@ -147,9 +140,9 @@
       startDrillbits();
       applyOptions();
     } catch (Exception e) {
+
       // Translate exceptions to unchecked to avoid cluttering
       // tests. Failures will simply fail the test itself.
-
       throw new IllegalStateException("Cluster fixture setup failed", e);
     }
   }
@@ -158,7 +151,7 @@
    * Set the client properties to be used by client fixture.
    */
   private void setClientProps() {
-      clientProps = builder.clientProps;
+    clientProps = builder.clientProps;
   }
 
   public Properties getClientProps() {
@@ -166,17 +159,17 @@
   }
 
   private void configureZk() {
-    // Start ZK if requested.
 
+    // Start ZK if requested.
     String zkConnect;
     if (builder.zkHelper != null) {
-      // Case where the test itself started ZK and we're only using it.
 
+      // Case where the test itself started ZK and we're only using it.
       zkHelper = builder.zkHelper;
       ownsZK = false;
     } else if (builder.localZkCount > 0) {
-      // Case where we need a local ZK just for this test cluster.
 
+      // Case where we need a local ZK just for this test cluster.
       zkHelper = new ZookeeperHelper();
       zkHelper.startZookeeper(builder.localZkCount);
       ownsZK = true;
@@ -189,7 +182,6 @@
       // in config properties defined at run time. Drill does not allow
       // combining locally-set properties and a config file: it is one
       // or the other.
-
       if (builder.configBuilder().hasResource()) {
         throw new IllegalArgumentException("Cannot specify a local ZK while using an external config file.");
       }
@@ -202,27 +194,27 @@
   }
 
   private void createConfig() throws Exception {
+
     // Create a config
     // Because of the way DrillConfig works, we can set the ZK
     // connection string only if a property set is provided.
-
     config = builder.configBuilder.build();
 
     if (builder.usingZk) {
-      // Distribute drillbit using ZK (in-process or external)
 
+      // Distribute drillbit using ZK (in-process or external)
       serviceSet = null;
       usesZk = true;
     } else {
-      // Embedded Drillbit.
 
+      // Embedded Drillbit.
       serviceSet = RemoteServiceSet.getLocalServiceSet();
     }
   }
 
   private void startDrillbits() throws Exception {
-    // Start the Drillbits.
 
+    // Start the Drillbits.
     Preconditions.checkArgument(builder.bitCount > 0);
     int bitCount = builder.bitCount;
     for (int i = 0; i < bitCount; i++) {
@@ -230,7 +222,6 @@
       bit.run();
 
       // Bit name and registration.
-
       String name;
       if (builder.bitNames != null && i < builder.bitNames.length) {
         name = builder.bitNames[i];
@@ -239,7 +230,6 @@
         // Name the Drillbit by default. Most tests use one Drillbit,
         // so make the name simple: "drillbit." Only add a numeric suffix
         // when the test creates multiple bits.
-
         if (bitCount == 1) {
           name = DEFAULT_BIT_NAME;
         } else {
@@ -250,7 +240,6 @@
 
       // Remember the first Drillbit, this is the default one returned from
       // drillbit().
-
       if (i == 0) {
         defaultDrillbit = bit;
       }
@@ -330,7 +319,6 @@
    * @return a test client. Client will be closed when this cluster
    * fixture closes, or can be closed early
    */
-
   public ClientFixture client(String host, int port) {
     return clientBuilder()
       .property(DrillProperties.DRILLBIT_CONNECTION, String.format("%s:%d", host, port))
@@ -573,6 +561,8 @@
     props.setProperty(ExecConstants.UDF_DIRECTORY_ROOT, dirTestWatcher.getHomeDir().getAbsolutePath());
     props.setProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH, dirTestWatcher.getStoreDir().getAbsolutePath());
     props.setProperty(ExecConstants.UDF_DIRECTORY_FS, FileSystem.DEFAULT_FS);
+    // ALTER SESSION profiles are seldom interesting
+    props.setProperty(ExecConstants.SKIP_ALTER_SESSION_QUERY_PROFILE, Boolean.TRUE.toString());
 
     builder.configBuilder.configProps(props);
     return builder;
@@ -640,7 +630,6 @@
    * @param value the value to encode
    * @return the SQL-acceptable string equivalent
    */
-
   public static String stringify(Object value) {
     if (value == null) {
       return null;
@@ -654,8 +643,8 @@
   }
 
   public static String getResource(String resource) throws IOException {
-    // Unlike the Java routines, Guava does not like a leading slash.
 
+    // Unlike the Java routines, Guava does not like a leading slash.
     final URL url = Resources.getResource(trimSlash(resource));
     if (url == null) {
       throw new IOException(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
index 24beb3a..9bc5312 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixtureBuilder.java
@@ -214,7 +214,6 @@
     usingZk = true;
 
     // Using ZK. Turn refresh wait back on.
-
     return configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
   }
 
@@ -236,7 +235,6 @@
     usingZk = true;
 
     // Using ZK. Turn refresh wait back on.
-
     configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
     return this;
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index e1b5d99..40fe40e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -23,6 +23,7 @@
 import java.util.LinkedHashSet;
 import java.util.Objects;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -393,27 +394,9 @@
    */
   public String toString(boolean includeChildren) {
     final int maxLen = 10;
-    final StringBuilder builder = new StringBuilder();
-    builder
-      .append("[`")
-      .append(name)
-      .append("` (")
-      .append(type.getMinorType().name());
-
-    if (type.hasPrecision() && (type.getPrecision() > 0 || Types.isDecimalType(type))) {
-      builder.append("(");
-      builder.append(type.getPrecision());
-      if (type.hasScale() && type.getScale() > 0) {
-        builder.append(", ");
-        builder.append(type.getScale());
-      }
-      builder.append(")");
-    }
-
-    builder
-      .append(":")
-      .append(type.getMode().name())
-      .append(")");
+    final StringBuilder builder = new StringBuilder()
+      .append("[`");
+    prefix(builder);
 
     if (includeChildren) {
       if (type.getSubTypeCount() > 0) {
@@ -436,17 +419,63 @@
         .toString();
   }
 
+  private void prefix(StringBuilder builder) {
+    builder
+      .append(name)
+      .append("` (")
+      .append(type.getMinorType().name());
+
+    if (type.hasPrecision() && (type.getPrecision() > 0 || Types.isDecimalType(type))) {
+      builder.append("(");
+      builder.append(type.getPrecision());
+      if (type.hasScale() && type.getScale() > 0) {
+        builder.append(", ");
+        builder.append(type.getScale());
+      }
+      builder.append(")");
+    }
+
+    builder
+      .append(":")
+      .append(type.getMode().name())
+      .append(")");
+  }
+
   @Override
   public String toString() {
     return toString(true);
   }
 
+  public String format() {
+    final StringBuilder builder = new StringBuilder();
+    format(builder, 0);
+    return builder.toString();
+  }
+
+  /**
+   * Format the field in a multi-line format, with children (but not subtypes)
+   * indented. Useful for wide rows where the single-line format is too hard
+   * to read.
+   */
+  public void format(StringBuilder builder, int level) {
+    builder.append(StringUtils.repeat(' ', level));
+    prefix(builder);
+    if (children != null && ! children.isEmpty()) {
+      builder.append(":\n");
+      for (MaterializedField child : children) {
+        child.format(builder, level + 1);
+      }
+    } else {
+      builder.append("\n");
+    }
+  }
+
   /**
    * Return true if two fields have identical MinorType and Mode.
    */
   public boolean hasSameTypeAndMode(MaterializedField that) {
-    return (getType().getMinorType() == that.getType().getMinorType())
-        && (getType().getMode() == that.getType().getMode());
+    return getType().getMinorType() == that.getType().getMinorType()
+        && getType().getMode() == that.getType().getMode();
   }
 
   private String toString(Collection<?> collection, int maxLen) {