DRILL-7506: Simplify code gen error handling

Pushes code gen error handling close to the code gen itself to
allow clearer error messages. Doing so avoids the need to bubble
code gen exceptions up the call stack, resulting in cleaner
operator code.

closes #1948
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 1ba8671..bf6508e 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -28,6 +28,7 @@
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * Base class for all user exception. The goal is to separate out common error conditions where we can give users
  * useful feedback.
@@ -45,7 +46,7 @@
  */
 public class UserException extends DrillRuntimeException {
   private static final long serialVersionUID = -6720929331624621840L;
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserException.class);
+  private static final Logger logger = LoggerFactory.getLogger(UserException.class);
 
   public static final String MEMORY_ERROR_MSG = "One or more nodes ran out of memory while executing the query.";
 
@@ -356,6 +357,22 @@
   }
 
   /**
+   * Report an unsupported schema change.
+   *
+   * @param cause the <tt>SchemaChangeException</tt>. (Not typed because that
+   * class is not visible to this package.)
+   * @return user exception builder.
+   */
+  public static Builder schemaChangeError(final Throwable cause) {
+    return new Builder(DrillPBError.ErrorType.UNSUPPORTED_OPERATION, cause)
+        .addContext("Unsupported schema change");
+  }
+
+  public static Builder schemaChangeError() {
+    return schemaChangeError(null);
+  }
+
+  /**
    * Wraps an error that arises from execution due to issues in the query, in
    * the environment and so on -- anything other than "this should never occur"
    * type checks.
@@ -395,7 +412,6 @@
     return new Builder(DrillPBError.ErrorType.UNSPECIFIED_ERROR, cause);
   }
 
-
   /**
    * Builder class for DrillUserException. You can wrap an existing exception, in this case it will first check if
    * this exception is, or wraps, a DrillUserException. If it does then the builder will use the user exception as it is
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
index cf94446..8da2020 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
@@ -23,22 +23,31 @@
 import java.io.IOException;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.WritableBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachedVectorContainer.class);
+  private static final Logger logger = LoggerFactory.getLogger(CachedVectorContainer.class);
 
   private byte[] data;
   private final BufferAllocator allocator;
   private VectorContainer container;
 
-  public CachedVectorContainer(WritableBatch batch, BufferAllocator allocator) throws IOException {
+  public CachedVectorContainer(WritableBatch batch, BufferAllocator allocator) {
     VectorAccessibleSerializable va = new VectorAccessibleSerializable(batch, allocator);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    va.writeToStream(baos);
+    try {
+      va.writeToStream(baos);
+    } catch (IOException e) {
+      throw UserException.dataWriteError(e)
+          .addContext("Failed to write a cached batch to storage")
+          .build(logger);
+    }
     this.allocator = allocator;
     this.data = baos.toByteArray();
     va.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 17a2e0b..2d098ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -69,7 +69,6 @@
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.fn.AbstractFuncHolder;
 import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder;
@@ -104,14 +103,10 @@
   }
 
   public static LogicalExpression materializeAndCheckErrors(LogicalExpression expr,
-      VectorAccessible batch, FunctionLookupContext functionLookupContext)
-          throws SchemaChangeException {
+      VectorAccessible batch, FunctionLookupContext functionLookupContext) {
     ErrorCollector collector = new ErrorCollectorImpl();
     LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, collector, functionLookupContext, false, false);
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException(String.format(
-          "Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-    }
+    collector.reportErrors(logger);
     return e;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
index 9d30031..e795bcf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
@@ -18,20 +18,24 @@
 package org.apache.drill.exec.ops;
 
 import io.netty.buffer.DrillBuf;
-import java.io.IOException;
 import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Common implementation for both the test and production versions
  * of the fragment context.
  */
 public abstract class BaseFragmentContext implements FragmentContext {
+  private static final Logger logger = LoggerFactory.getLogger(BaseFragmentContext.class);
 
   private final FunctionImplementationRegistry funcRegistry;
 
@@ -45,27 +49,39 @@
   }
 
   @Override
-  public <T> T getImplementationClass(final ClassGenerator<T> cg)
-      throws ClassTransformationException, IOException {
+  public <T> T getImplementationClass(final ClassGenerator<T> cg) {
     return getImplementationClass(cg.getCodeGenerator());
   }
 
   @Override
-  public <T> T getImplementationClass(final CodeGenerator<T> cg)
-      throws ClassTransformationException, IOException {
-    T instance = getCompiler().createInstance(cg);
+  public <T> T getImplementationClass(final CodeGenerator<T> cg) {
+    T instance;
+    try {
+      instance = getCompiler().createInstance(cg);
+    } catch (ClassTransformationException e) {
+      throw UserException.internalError(e)
+          .message("Code generation error - likely code error.")
+          .build(logger);
+    }
     CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this);
     return instance;
   }
 
   @Override
-  public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
+  public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) {
     return getImplementationClass(cg.getCodeGenerator(), instanceCount);
   }
 
   @Override
-  public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
-    List<T> instances = getCompiler().createInstances(cg, instanceCount);
+  public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) {
+    List<T> instances;
+    try {
+      instances = getCompiler().createInstances(cg, instanceCount);
+    } catch (ClassTransformationException e) {
+      throw UserException.internalError(e)
+          .message("Code generation error - likely code error.")
+          .build(logger);
+    }
     instances.forEach(instance -> CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this));
     return instances;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 619f2d1..ac981da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.ops;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -27,7 +26,6 @@
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.compile.CodeCompiler;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionLookupContext;
@@ -70,8 +68,7 @@
    * @param cg the class generator
    * @return an instance of the generated class
    */
-  <T> T getImplementationClass(final ClassGenerator<T> cg)
-      throws ClassTransformationException, IOException;
+  <T> T getImplementationClass(final ClassGenerator<T> cg);
 
   /**
    * Generates code for a class given a {@link CodeGenerator},
@@ -82,8 +79,7 @@
    * @param cg the code generator
    * @return an instance of the generated class
    */
-  <T> T getImplementationClass(final CodeGenerator<T> cg)
-      throws ClassTransformationException, IOException;
+  <T> T getImplementationClass(final CodeGenerator<T> cg);
 
   /**
    * Generates code for a class given a {@link ClassGenerator}, and returns the
@@ -94,8 +90,7 @@
    * @param cg the class generator
    * @return list of instances of the generated class
    */
-  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount)
-      throws ClassTransformationException, IOException;
+  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount);
 
   /**
    * Returns the statement type (e.g. SELECT, CTAS, ANALYZE) from the query context.
@@ -108,8 +103,7 @@
    * Get this node's identity.
    * @return A DrillbitEndpoint object.
    */
-  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount)
-      throws ClassTransformationException, IOException;
+  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount);
 
   /**
    * Return the set of execution controls used to inject faults into running
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
index f26e78d..e9c5917 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.base;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -24,19 +25,18 @@
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PhysicalOperatorUtil {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalOperatorUtil.class);
+  private static final Logger logger = LoggerFactory.getLogger(PhysicalOperatorUtil.class);
 
-
-  private PhysicalOperatorUtil() {}
+  private PhysicalOperatorUtil() { }
 
   public static Set<Class<? extends PhysicalOperator>> getSubTypes(ScanResult classpathScan) {
     final Set<Class<? extends PhysicalOperator>> ops = classpathScan.getImplementations(PhysicalOperator.class);
@@ -46,16 +46,21 @@
   }
 
   /**
-   * Helper method to create a list of MinorFragmentEndpoint instances from a given endpoint assignment list.
+   * Helper method to create a list of {@code MinorFragmentEndpoint} instances from a
+   * given endpoint assignment list.
    *
-   * @param endpoints Assigned endpoint list. Index of each endpoint in list indicates the MinorFragmentId of the
-   *                  fragment that is assigned to the endpoint.
-   * @return
+   * @param endpoints
+   *          Assigned endpoint list. Index of each endpoint in list indicates
+   *          the MinorFragmentId of the fragment that is assigned to the
+   *          endpoint.
+   * @return a list of (minor fragment id, endpoint) pairs in which the
+   * minor fragment ID is reified as a member. Items are indexed by minor fragment
+   * ID.
    */
   public static List<MinorFragmentEndpoint> getIndexOrderedEndpoints(List<DrillbitEndpoint> endpoints) {
-    List<MinorFragmentEndpoint> destinations = Lists.newArrayList();
+    List<MinorFragmentEndpoint> destinations = new ArrayList<>();
     int minorFragmentId = 0;
-    for(DrillbitEndpoint endpoint : endpoints) {
+    for (DrillbitEndpoint endpoint : endpoints) {
       destinations.add(new MinorFragmentEndpoint(minorFragmentId, endpoint));
       minorFragmentId++;
     }
@@ -64,20 +69,18 @@
   }
 
   /**
-   * Helper method tp materialize the given logical expression using the ExpressionTreeMaterializer
+   * Helper method to materialize the given logical expression using the
+   * {@code ExpressionTreeMaterializer}.
    * @param expr Logical expression to materialize
    * @param incoming Incoming record batch
    * @param context Fragment context
    */
   public static LogicalExpression materializeExpression(LogicalExpression expr,
-      VectorAccessible incoming, FragmentContext context) throws SchemaChangeException {
+      VectorAccessible incoming, FragmentContext context)  {
     ErrorCollector collector = new ErrorCollectorImpl();
     LogicalExpression mle = ExpressionTreeMaterializer.materialize(expr, incoming, collector,
             context.getFunctionRegistry());
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException("Failure while materializing expression. "
-          + collector.toErrorString());
-    }
+    collector.reportErrors(logger);
     return mle;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
index 3744b94..1fe1f83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
@@ -48,7 +48,7 @@
    * This method must be called before fetching the final priority queue hyper batch and final Sv4 vector.
    * @throws SchemaChangeException
    */
-  void generate() throws SchemaChangeException;
+  void generate();
 
   /**
    * Retrieves the final priority queue HyperBatch containing the results. <b>Note:</b> this should be called
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 9518165..3ffa1db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -27,6 +27,7 @@
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
@@ -138,10 +139,14 @@
   }
 
   @Override
-  public void generate() throws SchemaChangeException {
+  public void generate() {
     Stopwatch watch = Stopwatch.createStarted();
     final DrillBuf drillBuf = allocator.buffer(4 * queueSize);
-    finalSv4 = new SelectionVector4(drillBuf, queueSize, 4000);
+    try {
+      finalSv4 = new SelectionVector4(drillBuf, queueSize, 4000);
+    } catch (SchemaChangeException e) {
+      throw AbstractRecordBatch.schemaChangeException(e, "Priority Queue", logger);
+    }
     for (int i = queueSize - 1; i >= 0; i--) {
       finalSv4.set(i, pop());
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 615c8ea..b8f16d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -46,7 +45,6 @@
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
@@ -260,9 +258,7 @@
     }
 
     incomingSchema = incoming.getSchema();
-    if (!createAggregator()) {
-      state = BatchState.DONE;
-    }
+    createAggregator();
     for (VectorWrapper<?> w : container) {
       AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0);
     }
@@ -367,16 +363,10 @@
    * @return true if the aggregator was setup successfully. false if there was a
    *         failure.
    */
-  private boolean createAggregator() {
+  private void createAggregator() {
     try {
       stats.startSetup();
-      this.aggregator = createAggregatorInternal();
-      return true;
-    } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
-      context.getExecutorState().fail(ex);
-      container.clear();
-      incoming.kill(false);
-      return false;
+      aggregator = createAggregatorInternal();
     } finally {
       stats.stopSetup();
     }
@@ -387,8 +377,7 @@
     complexWriters.add(writer);
   }
 
-  protected HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException,
-      IOException {
+  protected HashAggregator createAggregatorInternal() {
     CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getOptions());
     ClassGenerator<HashAggregator> cg = top.getRoot();
     ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
@@ -430,9 +419,7 @@
         throw UserException.unsupportedError(new UnsupportedOperationException("Union type not supported in aggregate functions")).build(logger);
       }
 
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-      }
+      collector.reportErrors(logger);
 
       if (expr == null) {
         continue;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1c2cd85..3185899 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -300,7 +300,7 @@
   @Override
   public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext,
                     RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds,
-                    ClassGenerator<?> cg, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
+                    ClassGenerator<?> cg, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) {
 
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -371,7 +371,11 @@
     estRowWidth = extraRowBytes;
     estValuesRowWidth = extraRowBytes;
 
-    doSetup(incoming);
+    try {
+      doSetup(incoming);
+    } catch (SchemaChangeException e) {
+      throw HashAggBatch.schemaChangeException(e, "Hash Aggregate", logger);
+    }
   }
 
   /**
@@ -713,7 +717,6 @@
 
           return AggOutcome.RETURN_OUTCOME;
 
-        case STOP:
         default:
           return AggOutcome.CLEANUP_AND_RETURN;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 5ee77ab..ff6a0c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -17,13 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -50,7 +47,7 @@
   void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
              OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
              LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, ClassGenerator<?> cg,
-             TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
+             TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes);
 
   IterOutcome getOutcome();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c1d5740..11261cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -23,7 +23,6 @@
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -36,7 +35,6 @@
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -165,7 +163,7 @@
   }
 
   @Override
-  public void buildSchema() throws SchemaChangeException {
+  public void buildSchema() {
     IterOutcome outcome = next(incoming);
     switch (outcome) {
       case NONE:
@@ -180,15 +178,13 @@
     }
 
     incomingSchema = incoming.getSchema();
-    if (!createAggregator()) {
-      state = BatchState.DONE;
-    }
+    createAggregator();
     container.allocateNew();
 
     if (complexWriters != null) {
       container.buildSchema(SelectionVectorMode.NONE);
     }
-    container.setRecordCount(0);
+    container.setEmpty();
   }
 
   @Override
@@ -243,10 +239,7 @@
         case STOP:
           return lastKnownOutcome;
         case OK_NEW_SCHEMA:
-          if (!createAggregator()) {
-            done = true;
-            return IterOutcome.STOP;
-          }
+          createAggregator();
           firstBatchForSchema = true;
           break;
         case EMIT:
@@ -422,22 +415,15 @@
   }
 
   /**
-   * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
-   * and informing the context of the failure state, as well is informing the upstream operators.
-   *
-   * @return true if the aggregator was setup successfully. false if there was a failure.
+   * Creates a new Aggregator based on the current schema. If setup fails, this
+   * method is responsible for cleaning up and informing the context of the
+   * failure state, as well is informing the upstream operators.
    */
-  private boolean createAggregator() {
+  private void createAggregator() {
     logger.debug("Creating new aggregator.");
     try {
       stats.startSetup();
-      this.aggregator = createAggregatorInternal();
-      return true;
-    } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
-      context.getExecutorState().fail(ex);
-      container.clear();
-      incoming.kill(false);
-      return false;
+      aggregator = createAggregatorInternal();
     } finally {
       stats.stopSetup();
     }
@@ -447,7 +433,7 @@
     complexWriters.add(writer);
   }
 
-  protected StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException {
+  protected StreamingAggregator createAggregatorInternal() {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
@@ -507,10 +493,7 @@
       }
     }
 
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-    }
-
+    collector.reportErrors(logger);
     setupIsSame(cg, keyExprs);
     setupIsSameApart(cg, keyExprs);
     addRecordValues(cg, valueExprs);
@@ -522,7 +505,11 @@
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(oContext, incoming, this, maxOutputRowCount);
+    try {
+      agg.setup(oContext, incoming, this, maxOutputRowCount);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
+    }
     allocateComplexWriters();
     return agg;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 6d06a58..b03e25f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -17,13 +17,11 @@
  */
 package org.apache.drill.exec.physical.impl.filter;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -44,7 +42,6 @@
 import org.slf4j.LoggerFactory;
 
 public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
-
   private static final Logger logger = LoggerFactory.getLogger(FilterRecordBatch.class);
 
   private SelectionVector2 sv2;
@@ -108,7 +105,7 @@
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema()  {
     clearSv();
 
     switch (incoming.getSchema().getSelectionVectorMode()) {
@@ -162,17 +159,13 @@
     // allocate outgoing sv4
     container.buildSchema(SelectionVectorMode.FOUR_BYTE);
 
-    try {
-      final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
-      final Filterer filter = context.getImplementationClass(cg);
-      filter.setup(context, incoming, this, tx);
-      return filter;
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load generated class", e);
-    }
+    final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+    final Filterer filter = context.getImplementationClass(cg);
+    filter.setup(context, incoming, this, tx);
+    return filter;
   }
 
-  protected Filterer generateSV2Filterer() throws SchemaChangeException {
+  protected Filterer generateSV2Filterer() {
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
     final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions());
@@ -182,9 +175,7 @@
 
     final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector,
             context.getFunctionRegistry(), false, unionTypeEnabled);
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-    }
+    collector.reportErrors(logger);
 
     cg.addExpr(new ReturnValueExpression(expr), ClassGenerator.BlkCreateMode.FALSE);
 
@@ -193,16 +184,16 @@
       transfers.add(pair);
     }
 
+    final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+    CodeGenerator<Filterer> codeGen = cg.getCodeGenerator();
+    codeGen.plainJavaCapable(true);
+    final Filterer filter = context.getImplementationClass(codeGen);
     try {
-      final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
-      CodeGenerator<Filterer> codeGen = cg.getCodeGenerator();
-      codeGen.plainJavaCapable(true);
-      final Filterer filter = context.getImplementationClass(codeGen);
       filter.setup(context, incoming, this, tx);
-      return filter;
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
     }
+    return filter;
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 34e9d7f..a90c9aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.flatten;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -30,7 +29,6 @@
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -372,7 +370,7 @@
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     allocationVectors = new ArrayList<>();
     container.clear();
     List<NamedExpression> exprs = getExpressionList();
@@ -427,10 +425,7 @@
 
       LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(),
           incoming, collector, context.getFunctionRegistry(), true);
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException(String.format(
-            "Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
-      }
+      collector.reportErrors(logger);
       if (expr instanceof DrillFuncHolderExpr &&
           ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) {
         // Need to process ComplexWriter function evaluation.
@@ -477,8 +472,8 @@
     try {
       flattener = context.getImplementationClass(cg.getCodeGenerator());
       flattener.setup(context, incoming, this, transfers);
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
     }
     return true;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 18af718..6977e18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -48,7 +48,6 @@
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
@@ -347,7 +346,7 @@
   }
 
   @Override
-  protected void buildSchema() throws SchemaChangeException {
+  protected void buildSchema() {
     // We must first get the schemas from upstream operators before we can build
     // our schema.
     boolean validSchema = prefetchFirstBatchFromBothSides();
@@ -373,11 +372,7 @@
         setupHashTable();
       }
 
-      try {
-        hashJoinProbe = setupHashJoinProbe();
-      } catch (IOException | ClassTransformationException e) {
-        throw new SchemaChangeException(e);
-      }
+      hashJoinProbe = setupHashJoinProbe();
     }
 
     // If we have a valid schema, this will build a valid container.
@@ -680,14 +675,12 @@
 
       return IterOutcome.NONE;
     } catch (SchemaChangeException e) {
-      context.getExecutorState().fail(e);
-      killIncoming(false);
-      return IterOutcome.STOP;
+      throw UserException.schemaChangeError(e).build(logger);
     }
   }
 
   /**
-   *  In case an upstream data is no longer needed, send a kill and flush any remaining batch
+   * In case an upstream data is no longer needed, send a kill and flush any remaining batch
    *
    * @param batch probe or build batch
    * @param upstream which upstream
@@ -703,7 +696,7 @@
   private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
   private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
 
-  private void setupHashTable() throws SchemaChangeException {
+  private void setupHashTable() {
     List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
     conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
@@ -722,9 +715,11 @@
       leftExpr = null;
     } else {
       if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
-        String errorMsg = new StringBuilder().append("Hash join does not support probe batch with selection vectors. ").append("Probe batch has selection mode = ").append
-          (probeBatch.getSchema().getSelectionVectorMode()).toString();
-        throw new SchemaChangeException(errorMsg);
+        throw UserException.internalError(null)
+          .message("Hash join does not support probe batch with selection vectors.")
+          .addContext("Probe batch has selection mode",
+              (probeBatch.getSchema().getSelectionVectorMode()).toString())
+          .build(logger);
       }
     }
 
@@ -739,15 +734,13 @@
     }
   }
 
-  private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException {
+  private void setupHash64(HashTableConfig htConfig) {
     LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()];
     ErrorCollector collector = new ErrorCollectorImpl();
     int i = 0;
     for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
       LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-      }
+      collector.reportErrors(logger);
       if (expr == null) {
         continue;
       }
@@ -794,7 +787,9 @@
     try {
       hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
     } catch (Exception e) {
-      throw new SchemaChangeException("Failed to construct a field's hash64 dynamic codes", e);
+      throw UserException.internalError(e)
+            .message("Failed to construct a field's hash64 dynamic codes")
+            .build(logger);
     }
   }
 
@@ -1344,13 +1339,13 @@
       htStats.addStats(newStats);
     }
 
-    this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
-    this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
-    this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
-    this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
-    this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
-    this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
-    this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+    stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+    stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+    stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+    stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
+    stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+    stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
+    stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
   }
 
   /**
@@ -1458,10 +1453,8 @@
     super.close();
   }
 
-  public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
-
+  public HashJoinProbe setupHashJoinProbe() {
     //  No real code generation !!
-
     return new HashJoinProbeTemplate();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 00c9363..c2a2b56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -19,7 +19,10 @@
 
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
@@ -56,11 +59,12 @@
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
- * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST
- * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract}
+ * RecordBatch implementation for the lateral join operator. Currently it's
+ * expected LATERAL to co-exists with UNNEST operator. Both LATERAL and UNNEST
+ * will share a contract with each other defined at {@link LateralContract}.
  */
 public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(LateralJoinBatch.class);
 
   // Maximum number records in the outgoing batch
   private int maxOutputRowCount;
@@ -90,13 +94,13 @@
   private boolean useMemoryManager = true;
 
   // Flag to keep track of new left batch so that update on memory manager is called only once per left batch
-  private boolean isNewLeftBatch = false;
+  private boolean isNewLeftBatch;
 
   private final HashSet<String> excludedFieldNames = new HashSet<>();
 
   private final String implicitColumn;
 
-  private boolean hasRemainderForLeftJoin = false;
+  private boolean hasRemainderForLeftJoin;
 
   private ValueVector implicitVector;
 
@@ -126,11 +130,14 @@
   }
 
   /**
-   * Handles cases where previous output batch got full after processing all the batches from right side for a left
-   * side batch. But there are still few unprocessed rows in left batch which cannot be ignored because JoinType is
-   * LeftJoin.
-   * @return - true if all the rows in left batch is produced in output container
-   *           false if there is still some rows pending in left incoming container
+   * Handles cases where previous output batch got full after processing all the
+   * batches from right side for a left side batch. But there are still few
+   * unprocessed rows in left batch which cannot be ignored because JoinType is
+   * {@code LeftJoin}.
+   *
+   * @return - true if all the rows in left batch is produced in output
+   *         container false if there is still some rows pending in left
+   *         incoming container
    */
   private boolean handleRemainingLeftRows() {
     Preconditions.checkState(popConfig.getJoinType() == JoinRelType.LEFT,
@@ -147,10 +154,12 @@
   }
 
   /**
-   * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is
-   * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty
-   * left incoming batch is received then it call's next on right branch to get an incoming and finally produces
-   * output.
+   * Gets the left and right incoming batch and produce the output batch. If the
+   * left incoming batch is empty then next on right branch is not called and
+   * empty batch with correct outcome is returned. If non empty left incoming
+   * batch is received then it call's next on right branch to get an incoming
+   * and finally produces output.
+   *
    * @return IterOutcome state of the lateral join batch
    */
   @Override
@@ -214,8 +223,8 @@
     // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
     // output container based on new left schema and old right schema. If schema change failed then return STOP
     // downstream
-    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
-      return STOP;
+    if (leftUpstream == OK_NEW_SCHEMA) {
+      handleSchemaChange();
     }
 
     // Setup the references of left, right and outgoing container in generated operator
@@ -280,8 +289,8 @@
   }
 
   /**
-   * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral
-   * to process the records at leftJoinIndex.
+   * Returns the left side incoming for the Lateral Join. Used by right branch
+   * leaf operator of Lateral to process the records at {@code leftJoinIndex}.
    *
    * @return - RecordBatch received as left side incoming
    */
@@ -294,8 +303,9 @@
   }
 
   /**
-   * Returns the current row index which the calling operator should process in current incoming left record batch.
-   * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never
+   * Returns the current row index which the calling operator should process in
+   * current incoming left record batch. LATERAL should never return it as -1
+   * since that indicated current left batch is empty and LATERAL will never
    * call next on right side with empty left batch
    *
    * @return - int - index of row to process.
@@ -320,10 +330,12 @@
    * ****************************************************************************************************************/
 
   /**
-   * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a
-   * failure outcome then we don't even call next on right branch, since there is no left incoming.
-   * @return true if both the left/right batch was received without failure outcome.
-   *         false if either of batch is received with failure outcome.
+   * Get the left and right batch during build schema phase for
+   * {@link LateralJoinBatch}. If left batch sees a failure outcome then we
+   * don't even call next on right branch, since there is no left incoming.
+   *
+   * @return true if both the left/right batch was received without failure
+   *         outcome. false if either of batch is received with failure outcome.
    */
   @Override
   protected boolean prefetchFirstBatchFromBothSides() {
@@ -355,7 +367,7 @@
    * @throws SchemaChangeException if batch schema was changed during execution
    */
   @Override
-  protected void buildSchema() throws SchemaChangeException {
+  protected void buildSchema() {
     // Prefetch a RecordBatch from both left and right branch
     if (!prefetchFirstBatchFromBothSides()) {
       return;
@@ -394,18 +406,11 @@
    * Private Methods
    * ****************************************************************************************************************/
 
-  private boolean handleSchemaChange() {
+  private void handleSchemaChange() {
     try {
       stats.startSetup();
       logger.debug("Setting up new schema based on incoming batch. Old output schema: {}", container.getSchema());
       setupNewSchema();
-      return true;
-    } catch (SchemaChangeException ex) {
-      logger.error("Failed to handle schema change hence killing the query");
-      context.getExecutorState().fail(ex);
-      left.kill(true); // Can have exchange receivers on left so called with true
-      right.kill(false); // Won't have exchange receivers on right side
-      return false;
     } finally {
       stats.stopSetup();
     }
@@ -416,11 +421,15 @@
   }
 
   /**
-   * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
-   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also
-   * when we populate the outgoing container then this method is called to get next left batch if current one is
-   * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of
-   * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
+   * Process left incoming batch with different
+   * {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is called
+   * from main {@link LateralJoinBatch#innerNext()} block with each
+   * {@code next()} call from upstream operator. Also when we populate the
+   * outgoing container then this method is called to get next left batch if
+   * current one is fully processed. It calls {@code next()} on left side until
+   * we get a non-empty RecordBatch. OR we get either of
+   * {@code OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET} outcome.
+   *
    * @return IterOutcome after processing current left batch
    */
   private IterOutcome processLeftBatch() {
@@ -451,12 +460,9 @@
           // If left batch is empty with actual schema change then just rebuild the output container and send empty
           // batch downstream
           if (emptyLeftBatch) {
-            if (handleSchemaChange()) {
-              leftJoinIndex = -1;
-              return OK_NEW_SCHEMA;
-            } else {
-              return STOP;
-            }
+            handleSchemaChange();
+            leftJoinIndex = -1;
+            return OK_NEW_SCHEMA;
           } // else - setup the new schema information after getting it from right side too.
         case OK:
           // With OK outcome we will keep calling next until we get a batch with >0 records
@@ -491,6 +497,8 @@
               "received NOT_YET");
           }
           break;
+        default:
+          throw new IllegalStateException("Unexpected iter outcome: " + leftUpstream.name());
       }
       needLeftBatch = leftJoinIndex == -1;
     }
@@ -498,10 +506,13 @@
   }
 
   /**
-   * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
-   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if
-   * left batch has some records in it. Also when we populate the outgoing container then this method is called to
-   * get next right batch if current one is fully processed.
+   * Process right incoming batch with different
+   * {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is called
+   * from main {@link LateralJoinBatch#innerNext()} block with each next() call
+   * from upstream operator and if left batch has some records in it. Also when
+   * we populate the outgoing container then this method is called to get next
+   * right batch if current one is fully processed.
+   *
    * @return IterOutcome after processing current left batch
    */
   private IterOutcome processRightBatch() {
@@ -531,13 +542,10 @@
           // Right batch with OK_NEW_SCHEMA can be non-empty so update the rightJoinIndex correctly and pass the
           // new schema downstream with empty batch and later with subsequent next() call the join output will be
           // produced
-          if (handleSchemaChange()) {
-            container.setRecordCount(0);
-            rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
-            return OK_NEW_SCHEMA;
-          } else {
-            return STOP;
-          }
+          handleSchemaChange();
+          container.setEmpty();
+          rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
+          return OK_NEW_SCHEMA;
         case OK:
         case EMIT:
           // Even if there are no records we should not call next() again because in case of LEFT join empty batch is
@@ -557,19 +565,24 @@
               "received NOT_YET");
           }
           break;
+        default:
+          throw new IllegalStateException("Unexpected iter outcome: " + leftUpstream.name());
       }
     }
     return rightUpstream;
   }
 
   /**
-   * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the
-   * records in the either or both the batches are consumed then it get's next batch from that branch depending upon
-   * if output batch still has some space left. If output batch is full then the output is finalized to be sent
-   * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the
-   * output using that.
+   * Gets the current left and right incoming batch and does the cross join to
+   * fill the output batch. If all the records in the either or both the batches
+   * are consumed then it get's next batch from that branch depending upon if
+   * output batch still has some space left. If output batch is full then the
+   * output is finalized to be sent downstream. Subsequent call's knows how to
+   * consume previously half consumed (if any) batches and producing the output
+   * using that.
    *
-   * @return - IterOutcome to be send along with output batch to downstream operator
+   * @return - IterOutcome to be send along with output batch to downstream
+   *         operator
    */
   private IterOutcome produceOutputBatch() {
 
@@ -701,9 +714,7 @@
         // in output container based on new left schema and old right schema. If schema change failed then return STOP
         // downstream
         if (leftUpstream == OK_NEW_SCHEMA && outputIndex == 0) {
-          if (!handleSchemaChange()) {
-            return STOP;
-          }
+          handleSchemaChange();
           // Since schema has change so we have new empty vectors in output container hence allocateMemory for them
           allocateVectors();
         } else {
@@ -766,6 +777,7 @@
    * @return - true - if newSchema is not same as oldSchema
    *         - false - if newSchema is same as oldSchema
    */
+  @SuppressWarnings("unused")
   private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) {
     return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema);
   }
@@ -815,7 +827,7 @@
    * Helps to create the outgoing container vectors based on known left and right batch schemas
    * @throws SchemaChangeException
    */
-  private void setupNewSchema() throws SchemaChangeException {
+  private void setupNewSchema() {
 
     logger.debug("Setting up new schema based on incoming batch. New left schema: {} and New right schema: {}",
       left.getSchema(), right.getSchema());
@@ -829,11 +841,15 @@
     rightSchema = batchSchemaWithNoExcludedCols(right.getSchema(), true);
 
     if (!verifyInputSchema(leftSchema)) {
-      throw new SchemaChangeException("Invalid Schema found for left incoming batch");
+      throw UserException.schemaChangeError()
+        .message("Invalid Schema found for left incoming batch of lateral join")
+        .build(logger);
     }
 
     if (!verifyInputSchema(rightSchema)) {
-      throw new SchemaChangeException("Invalid Schema found for right incoming batch");
+      throw UserException.schemaChangeError()
+        .message("Invalid Schema found for right incoming batch of lateral join")
+        .build(logger);
     }
 
     // Setup LeftSchema in outgoing container and also include implicit column if present in left side for multilevel
@@ -891,7 +907,7 @@
   }
 
   /**
-   * Simple method to allocate space for all the vectors in the container.
+   * Allocates space for all the vectors in the container.
    */
   private void allocateVectors() {
     // This check is here and will be true only in case of left join where the pending rows from previous left batch is
@@ -907,7 +923,7 @@
       setMaxOutputRowCount(batchMemoryManager.getOutputRowCount());
     }
 
-    for (VectorWrapper w : container) {
+    for (VectorWrapper<?> w : container) {
       RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
     }
@@ -926,16 +942,21 @@
       case NOT_YET:
         state = BatchState.DONE;
         return false;
+      default:
     }
     return true;
   }
 
   /**
-   * Creates a map of rowId to number of rows with that rowId in the right incoming batch of Lateral Join. It is
-   * expected from UnnestRecordBatch to add an implicit column of IntVectorType with each output row. All the array
-   * records belonging to same row in left incoming will have same rowId in the Unnest output batch.
+   * Creates a map of rowId to number of rows with that rowId in the right
+   * incoming batch of Lateral Join. It is expected from UnnestRecordBatch to
+   * add an implicit column of IntVectorType with each output row. All the array
+   * records belonging to same row in left incoming will have same rowId in the
+   * Unnest output batch.
+   *
    * @return - map of rowId to rowCount in right batch
    */
+  @SuppressWarnings("unused")
   private Map<Integer, Integer> getRowIdToRowCountMapping() {
     final Map<Integer, Integer> indexToFreq = new HashMap<>();
     final IntVector rowIdVector = (IntVector) implicitVector;
@@ -956,10 +977,12 @@
   }
 
   /**
-   * Main entry point for producing the output records. This method populates the output batch after cross join of
-   * the record in a given left batch at left index and all the corresponding rows in right batches produced by Unnest
-   * for current left batch. For each call to this function number of records copied in output batch is limited to
-   * maximum rows output batch can hold or the number of rows in right incoming batch
+   * Main entry point for producing the output records. This method populates
+   * the output batch after cross join of the record in a given left batch at
+   * left index and all the corresponding rows in right batches produced by
+   * Unnest for current left batch. For each call to this function number of
+   * records copied in output batch is limited to maximum rows output batch can
+   * hold or the number of rows in right incoming batch
    */
   private void crossJoinAndOutputRecords() {
     final int rightRecordCount = right.getRecordCount();
@@ -1048,17 +1071,17 @@
   }
 
   /**
-   * Get's references of vector's from input and output vector container and create the mapping between them in
+   * Gets references of vector's from input and output vector container and create the mapping between them in
    * respective maps. Example: for right incoming batch the references of input vector to corresponding output
    * vector will be stored in {@link LateralJoinBatch#rightInputOutputVector}. This is done here such that during
    * copy we don't have to figure out this mapping everytime for each input and output vector and then do actual copy.
    * There was overhead seen with functions {@link MaterializedField#getValueClass()} and
    * {@link RecordBatch#getValueAccessorById(Class, int...)} since it will be called for each row copy.
    * @param batch - Incoming RecordBatch
-   * @param startVectorIndex - StartIndex of output vector container
-   * @param endVectorIndex - endIndex of output vector container
-   * @param baseVectorIndex - delta to add in startIndex for getting vectors in output container
-   * @param isRightBatch - is batch passed left or right child
+   * @param startVectorIndex StartIndex of output vector container
+   * @param endVectorIndex endIndex of output vector container
+   * @param baseVectorIndex delta to add in startIndex for getting vectors in output container
+   * @param isRightBatch is batch passed left or right child
    */
   private void setupInputOutputVectors(RecordBatch batch, int startVectorIndex, int endVectorIndex,
                                        int baseVectorIndex, boolean isRightBatch) {
@@ -1102,16 +1125,18 @@
   }
 
   /**
-   * Given a vector reference mapping between source and destination vector, copies data from all the source vectors
-   * at fromRowIndex to all the destination vectors in output batch at toRowIndex.
+   * Given a vector reference mapping between source and destination vector,
+   * copies data from all the source vectors at {@code fromRowIndex} to all the
+   * destination vectors in output batch at {@code toRowIndex}.
    *
-   * @param fromRowIndex - row index of all the vectors in batch to copy data from
-   * @param toRowIndex - row index of all the vectors in outgoing batch to copy data to
-   * @param mapping - source record batch holding vectors with data
-   * @param numRowsToCopy - Number of rows to copy into output batch
-   * @param isRightBatch - boolean to indicate if the fromIndex should also be increased or not. Since in case of
-   *                      copying data from left vector fromIndex is constant whereas in case of copying data from right
-   *                      vector fromIndex will move along with output index.
+   * @param fromRowIndex row index of all the vectors in batch to copy data from
+   * @param toRowIndex row index of all the vectors in outgoing batch to copy data to
+   * @param mapping source record batch holding vectors with data
+   * @param numRowsToCopy Number of rows to copy into output batch
+   * @param isRightBatch indicates if the fromIndex should also be increased or
+   *          not. Since in case of copying data from left vector fromIndex is
+   *          constant whereas in case of copying data from right vector
+   *          fromIndex will move along with output index.
    */
   private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, Map<ValueVector, ValueVector> mapping,
                                        int numRowsToCopy, boolean isRightBatch) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index ef477b9..5c9525c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -38,7 +38,6 @@
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -65,14 +64,13 @@
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 
 import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
 
 /**
- * A join operator merges two sorted streams using record iterator.
+ * A join operator that merges two sorted streams using a record iterator.
  */
 public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
 
@@ -107,9 +105,6 @@
   private final JoinRelType joinType;
   private JoinWorker worker;
 
-  private static final String LEFT_INPUT = "LEFT INPUT";
-  private static final String RIGHT_INPUT = "RIGHT INPUT";
-
   private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
 
     MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
@@ -218,12 +213,8 @@
         try {
           logger.debug("Creating New Worker");
           stats.startSetup();
-          this.worker = generateNewWorker();
+          worker = generateNewWorker();
           first = true;
-        } catch (ClassTransformationException | IOException | SchemaChangeException e) {
-          context.getExecutorState().fail(new SchemaChangeException(e));
-          kill(false);
-          return IterOutcome.STOP;
         } finally {
           stats.stopSetup();
         }
@@ -311,7 +302,7 @@
     right.kill(sendUpstream);
   }
 
-  private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException {
+  private JoinWorker generateNewWorker() {
     final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // cg.getCodeGenerator().saveCodeForDebugging(true);
@@ -434,7 +425,11 @@
     }
 
     JoinWorker w = context.getImplementationClass(cg);
-    w.setupJoin(context, status, this.container);
+    try {
+      w.setupJoin(context, status, this.container);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
+    }
     return w;
   }
 
@@ -496,8 +491,9 @@
   }
 
   private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
-                                 LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, LogicalExpression[] rightExpression,
-                                 JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException {
+                                 LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch,
+                                 LogicalExpression[] rightExpression,
+                                 JVar incomingRightRecordBatch, ErrorCollector collector) {
 
     cg.setMappingSet(compareMapping);
     if (status.getRightStatus() != IterOutcome.NONE) {
@@ -539,18 +535,15 @@
   }
 
   private LogicalExpression materializeExpression(LogicalExpression expression, IterOutcome lastStatus,
-                                                  VectorAccessible input, ErrorCollector collector) throws ClassTransformationException {
+                                                  VectorAccessible input, ErrorCollector collector) {
     LogicalExpression materializedExpr;
     if (lastStatus != IterOutcome.NONE) {
-      materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry(), unionTypeEnabled);
+      materializedExpr = ExpressionTreeMaterializer.materialize(expression, input,
+          collector, context.getFunctionRegistry(), unionTypeEnabled);
     } else {
       materializedExpr = new TypedNullConstant(Types.optional(MinorType.INT));
     }
-    if (collector.hasErrors()) {
-      throw new ClassTransformationException(String.format(
-        "Failure while trying to materialize incoming field from %s batch.  Errors:\n %s.",
-        (input == leftIterator ? LEFT_INPUT : RIGHT_INPUT), collector.toErrorString()));
-    }
+    collector.reportErrors(logger);
     return materializedExpr;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 2dd5973..38f7aa0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
@@ -35,7 +34,6 @@
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -53,6 +51,7 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
@@ -183,6 +182,7 @@
           case NOT_YET:
             drainRight = false;
             break;
+          default:
         }
       }
       nljWorker.setupNestedLoopJoin(context, left, rightContainer, rightCounts, this);
@@ -213,9 +213,7 @@
     }
     right.kill(true);
     while (hasMore(rightUpstream)) {
-      for (final VectorWrapper<?> wrapper : right) {
-        wrapper.getValueVector().clear();
-      }
+      VectorAccessibleUtilities.clear(right);
       rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
     }
   }
@@ -232,7 +230,7 @@
    * 3. emitRight() -> Project record from the right side (which is a hyper container)
    * @return the runtime generated class that implements the NestedLoopJoin interface
    */
-  private NestedLoopJoin setupWorker() throws IOException, ClassTransformationException, SchemaChangeException {
+  private NestedLoopJoin setupWorker()  {
     final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get(
         SETUP_LEFT_MAPPING, NestedLoopJoin.TEMPLATE_DEFINITION, context.getOptions());
     nLJCodeGenerator.plainJavaCapable(true);
@@ -267,10 +265,7 @@
         false,
         false);
 
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException(String.format("Failure while trying to materialize join condition. Errors:\n %s.",
-          collector.toErrorString()));
-    }
+    collector.reportErrors(logger);
 
     nLJClassGenerator.addExpr(new ReturnValueExpression(materialize), ClassGenerator.BlkCreateMode.FALSE);
 
@@ -358,59 +353,54 @@
    * @throws SchemaChangeException if batch schema was changed during execution
    */
   @Override
-  protected void buildSchema() throws SchemaChangeException {
-    try {
-      if (! prefetchFirstBatchFromBothSides()) {
-        return;
-      }
-
-      batchMemoryManager.update(RIGHT_INDEX, 0, true);
-      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
-        batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
-
-      if (leftUpstream != IterOutcome.NONE) {
-        leftSchema = left.getSchema();
-        container.copySchemaFrom(left);
-      }
-
-      if (rightUpstream != IterOutcome.NONE) {
-        // make right input schema optional if we have LEFT join
-        for (final VectorWrapper<?> vectorWrapper : right) {
-          TypeProtos.MajorType inputType = vectorWrapper.getField().getType();
-          TypeProtos.MajorType outputType;
-          if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) {
-            outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL);
-          } else {
-            outputType = inputType;
-          }
-          MaterializedField newField = MaterializedField.create(vectorWrapper.getField().getName(), outputType);
-          ValueVector valueVector = container.addOrGet(newField);
-          if (valueVector instanceof AbstractContainerVector) {
-            vectorWrapper.getValueVector().makeTransferPair(valueVector);
-            valueVector.clear();
-          }
-        }
-        rightSchema = right.getSchema();
-        addBatchToHyperContainer(right);
-      }
-
-      nljWorker = setupWorker();
-
-      // if left batch is empty, fetch next
-      if (leftUpstream != IterOutcome.NONE && left.getRecordCount() == 0) {
-        leftUpstream = next(LEFT_INPUT, left);
-      }
-
-      batchMemoryManager.update(LEFT_INDEX, 0);
-      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
-        batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
-
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      container.setEmpty();
-
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException(e);
+  protected void buildSchema() {
+    if (! prefetchFirstBatchFromBothSides()) {
+      return;
     }
+
+    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+      batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
+
+    if (leftUpstream != IterOutcome.NONE) {
+      leftSchema = left.getSchema();
+      container.copySchemaFrom(left);
+    }
+
+    if (rightUpstream != IterOutcome.NONE) {
+      // make right input schema optional if we have LEFT join
+      for (final VectorWrapper<?> vectorWrapper : right) {
+        TypeProtos.MajorType inputType = vectorWrapper.getField().getType();
+        TypeProtos.MajorType outputType;
+        if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() == TypeProtos.DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
+        MaterializedField newField = MaterializedField.create(vectorWrapper.getField().getName(), outputType);
+        ValueVector valueVector = container.addOrGet(newField);
+        if (valueVector instanceof AbstractContainerVector) {
+          vectorWrapper.getValueVector().makeTransferPair(valueVector);
+          valueVector.clear();
+        }
+      }
+      rightSchema = right.getSchema();
+      addBatchToHyperContainer(right);
+    }
+
+    nljWorker = setupWorker();
+
+    // if left batch is empty, fetch next
+    if (leftUpstream != IterOutcome.NONE && left.getRecordCount() == 0) {
+      leftUpstream = next(LEFT_INPUT, left);
+    }
+
+    batchMemoryManager.update(LEFT_INDEX, 0);
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+      batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.setEmpty();
   }
 
   private void addBatchToHyperContainer(RecordBatch inputBatch) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index 95c6acd..daf2352 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -38,22 +38,20 @@
  */
 public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
-
   // Current left input batch being processed
-  private RecordBatch left = null;
+  private RecordBatch left;
 
   // Record count of the left batch currently being processed
-  private int leftRecordCount = 0;
+  private int leftRecordCount;
 
   // List of record counts per batch in the hyper container
-  private List<Integer> rightCounts = null;
+  private List<Integer> rightCounts;
 
   // Output batch
-  private NestedLoopJoinBatch outgoing = null;
+  private NestedLoopJoinBatch outgoing;
 
   // Iteration status tracker
-  private IterationStatusTracker tracker = new IterationStatusTracker();
+  private final IterationStatusTracker tracker = new IterationStatusTracker();
 
   private int targetOutputRecords;
 
@@ -67,6 +65,7 @@
    * @param rightCounts Counts for each right container
    * @param outgoing Output batch
    */
+  @Override
   public void setupNestedLoopJoin(FragmentContext context,
                                   RecordBatch left,
                                   ExpandableHyperContainer rightContainer,
@@ -91,6 +90,7 @@
    * @param joinType join type (INNER ot LEFT)
    * @return the number of records produced in the output batch
    */
+  @Override
   public int outputRecords(JoinRelType joinType) {
     int outputIndex = 0;
     while (leftRecordCount != 0) {
@@ -188,7 +188,6 @@
             " found on the left side of NLJ.");
       case NONE:
       case NOT_YET:
-      case STOP:
         leftRecordCount = 0;
         break;
       case OK:
@@ -199,18 +198,22 @@
           outgoing.getRecordBatchStatsContext());
         leftRecordCount = left.getRecordCount();
         break;
+      default:
     }
   }
 
+  @Override
   public abstract void doSetup(@Named("context") FragmentContext context,
                                @Named("rightContainer") VectorContainer rightContainer,
                                @Named("leftBatch") RecordBatch leftBatch,
                                @Named("outgoing") RecordBatch outgoing);
 
+  @Override
   public abstract void emitRight(@Named("batchIndex") int batchIndex,
                                  @Named("recordIndexWithinBatch") int recordIndexWithinBatch,
                                  @Named("outIndex") int outIndex);
 
+  @Override
   public abstract void emitLeft(@Named("leftIndex") int leftIndex,
                                 @Named("outIndex") int outIndex);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 9443882..8845e7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -32,7 +32,6 @@
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -346,13 +345,7 @@
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
       // generate code for merge operations (copy and compare)
-      try {
-        merger = createMerger();
-      } catch (final SchemaChangeException e) {
-        logger.error("Failed to generate code for MergingReceiver.  {}", e);
-        context.getExecutorState().fail(e);
-        return IterOutcome.STOP;
-      }
+      merger = createMerger();
 
       // allocate the priority queue with the generated comparator
       this.pqueue = new PriorityQueue<>(fragProviders.length, new Comparator<Node>() {
@@ -663,26 +656,28 @@
    * @return instance of a new merger based on generated code
    * @throws SchemaChangeException
    */
-  private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
+  private MergingReceiverGeneratorBase createMerger() {
+
+    final CodeGenerator<MergingReceiverGeneratorBase> cg =
+        CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION,
+            context.getOptions());
+    cg.plainJavaCapable(true);
+    // Uncomment out this line to debug the generated code.
+    // cg.saveCodeForDebugging(true);
+    final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
+
+    ExpandableHyperContainer batch = null;
+    boolean first = true;
+    for (final RecordBatchLoader loader : batchLoaders) {
+      if (first) {
+        batch = new ExpandableHyperContainer(loader);
+        first = false;
+      } else {
+        batch.addBatch(loader);
+      }
+    }
 
     try {
-      final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getOptions());
-      cg.plainJavaCapable(true);
-      // Uncomment out this line to debug the generated code.
-      // cg.saveCodeForDebugging(true);
-      final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
-
-      ExpandableHyperContainer batch = null;
-      boolean first = true;
-      for (final RecordBatchLoader loader : batchLoaders) {
-        if (first) {
-          batch = new ExpandableHyperContainer(loader);
-          first = false;
-        } else {
-          batch.addBatch(loader);
-        }
-      }
-
       generateComparisons(g, batch);
 
       g.setMappingSet(COPIER_MAPPING_SET);
@@ -692,8 +687,8 @@
 
       merger.doSetup(context, batch, container);
       return merger;
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException(e);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java
index 7c83b2d..6fcc956 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java
@@ -18,8 +18,6 @@
 package org.apache.drill.exec.physical.impl.metadata;
 
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MetadataHashAggPOP;
@@ -27,7 +25,6 @@
 import org.apache.drill.exec.physical.impl.aggregate.HashAggregator;
 import org.apache.drill.exec.record.RecordBatch;
 
-import java.io.IOException;
 import java.util.List;
 
 public class MetadataHashAggBatch extends HashAggBatch {
@@ -38,8 +35,7 @@
   }
 
   @Override
-  protected HashAggregator createAggregatorInternal()
-      throws SchemaChangeException, ClassTransformationException, IOException {
+  protected HashAggregator createAggregatorInternal() {
     MetadataHashAggPOP popConfig = (MetadataHashAggPOP) this.popConfig;
 
     valueExpressions = new MetadataAggregateHelper(popConfig.getContext(),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java
index d982179..8196e2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java
@@ -18,9 +18,7 @@
 package org.apache.drill.exec.physical.impl.metadata;
 
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MetadataStreamAggPOP;
@@ -28,24 +26,25 @@
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator;
 import org.apache.drill.exec.record.RecordBatch;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
- * Operator which adds aggregate calls for all incoming columns to calculate required metadata and produces aggregations.
- * If aggregation is performed on top of another aggregation, required aggregate calls for merging metadata will be added.
+ * Operator which adds aggregate calls for all incoming columns to calculate
+ * required metadata and produces aggregations. If aggregation is performed on
+ * top of another aggregation, required aggregate calls for merging metadata
+ * will be added.
  */
 public class MetadataStreamAggBatch extends StreamingAggBatch {
 
   private List<NamedExpression> valueExpressions;
 
-  public MetadataStreamAggBatch(MetadataStreamAggPOP popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+  public MetadataStreamAggBatch(MetadataStreamAggPOP popConfig,
+      RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(popConfig, incoming, context);
   }
 
   @Override
-  protected StreamingAggregator createAggregatorInternal()
-      throws SchemaChangeException, ClassTransformationException, IOException {
+  protected StreamingAggregator createAggregatorInternal() {
     MetadataStreamAggPOP popConfig = (MetadataStreamAggPOP) this.popConfig;
 
     valueExpressions = new MetadataAggregateHelper(popConfig.getContext(),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index d67ae42..c32cdbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.orderedpartitioner;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -40,7 +39,6 @@
 import org.apache.drill.exec.cache.DistributedMap;
 import org.apache.drill.exec.cache.DistributedMultiMap;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -166,7 +164,7 @@
     partitionKeyVector.clear();
   }
 
-  private boolean saveSamples() throws SchemaChangeException, ClassTransformationException, IOException {
+  private boolean saveSamples() {
     recordsSampled = 0;
     IterOutcome upstream;
 
@@ -205,7 +203,11 @@
 
       Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sortedSamples);
       SelectionVector4 sv4 = builder.getSv4();
-      sorter.setup(context, sv4, sortedSamples);
+      try {
+        sorter.setup(context, sv4, sortedSamples);
+      } catch (SchemaChangeException e) {
+        throw schemaChangeException(e, logger);
+      }
       sorter.sort(sv4, sortedSamples);
 
       // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
@@ -283,61 +285,53 @@
    * @return True is successful. False if failed.
    */
   private boolean getPartitionVectors() {
-    try {
-      if (!saveSamples()) {
-        return false;
-      }
-
-      CachedVectorContainer finalTable = null;
-
-      long val = minorFragmentSampleCount.incrementAndGet();
-      logger.debug("Incremented mfsc, got {}", val);
-
-      long fragmentsBeforeProceed =
-          (long) Math.ceil(sendingMajorFragmentWidth * completionFactor);
-      String finalTableKey = mapKey + "final";
-
-      if (val == fragmentsBeforeProceed) { // we crossed the barrier, build table and get data.
-        buildTable();
-        finalTable = tableMap.get(finalTableKey);
-      } else {
-        // Wait until sufficient number of fragments have submitted samples, or proceed after xx ms passed
-        // TODO: this should be polling.
-
-        if (val < fragmentsBeforeProceed) {
-          waitUntilTimeOut(10);
-        }
-        for (int i = 0; i < 100 && finalTable == null; i++) {
-          finalTable = tableMap.get(finalTableKey);
-          if (finalTable != null) {
-            break;
-          }
-          waitUntilTimeOut(10);
-        }
-        if (finalTable == null) {
-          buildTable();
-        }
-        finalTable = tableMap.get(finalTableKey);
-      }
-
-      Preconditions.checkState(finalTable != null);
-
-      // Extract vectors from the wrapper, and add to partition vectors. These vectors will be used for partitioning in
-      // the rest of this operator
-      for (VectorWrapper<?> w : finalTable.get()) {
-        partitionVectors.add(w.getValueVector());
-      }
-
-    } catch (ClassTransformationException | IOException | SchemaChangeException ex) {
-      kill(false);
-      context.getExecutorState().fail(ex);
+    if (!saveSamples()) {
       return false;
-      // TODO InterruptedException
+    }
+
+    CachedVectorContainer finalTable = null;
+
+    long val = minorFragmentSampleCount.incrementAndGet();
+    logger.debug("Incremented mfsc, got {}", val);
+
+    long fragmentsBeforeProceed =
+        (long) Math.ceil(sendingMajorFragmentWidth * completionFactor);
+    String finalTableKey = mapKey + "final";
+
+    if (val == fragmentsBeforeProceed) { // we crossed the barrier, build table and get data.
+      buildTable();
+      finalTable = tableMap.get(finalTableKey);
+    } else {
+      // Wait until sufficient number of fragments have submitted samples, or proceed after xx ms passed
+      // TODO: this should be polling.
+
+      if (val < fragmentsBeforeProceed) {
+        waitUntilTimeOut(10);
+      }
+      for (int i = 0; i < 100 && finalTable == null; i++) {
+        finalTable = tableMap.get(finalTableKey);
+        if (finalTable != null) {
+          break;
+        }
+        waitUntilTimeOut(10);
+      }
+      if (finalTable == null) {
+        buildTable();
+      }
+      finalTable = tableMap.get(finalTableKey);
+    }
+
+    Preconditions.checkState(finalTable != null);
+
+    // Extract vectors from the wrapper, and add to partition vectors. These vectors will be used for partitioning in
+    // the rest of this operator
+    for (VectorWrapper<?> w : finalTable.get()) {
+      partitionVectors.add(w.getValueVector());
     }
     return true;
   }
 
-  private void buildTable() throws SchemaChangeException, ClassTransformationException, IOException {
+  private void buildTable() {
 
     // Get all samples from distributed map
 
@@ -361,7 +355,11 @@
       // sort the data incoming samples.
       SelectionVector4 newSv4 = containerBuilder.getSv4();
       Sorter sorter = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
-      sorter.setup(context, newSv4, allSamplesContainer);
+      try {
+        sorter.setup(context, newSv4, allSamplesContainer);
+      } catch (SchemaChangeException e) {
+        throw schemaChangeException(e, logger);
+      }
       sorter.sort(newSv4, allSamplesContainer);
 
       // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
@@ -413,7 +411,7 @@
    * @throws SchemaChangeException
    */
   private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
-      List<Ordering> orderings, List<ValueVector> localAllocationVectors) throws SchemaChangeException {
+      List<Ordering> orderings, List<ValueVector> localAllocationVectors) {
     ErrorCollector collector = new ErrorCollectorImpl();
     ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getOptions());
     // Note: disabled for now. This may require some debugging:
@@ -429,10 +427,7 @@
           .clearMode().setMode(TypeProtos.DataMode.REQUIRED);
       TypeProtos.MajorType newType = builder.build();
       MaterializedField outputField = MaterializedField.create("f" + i++, newType);
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException(String.format(
-            "Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-      }
+      collector.reportErrors(logger);
 
       ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       localAllocationVectors.add(vector);
@@ -448,8 +443,8 @@
       SampleCopier sampleCopier = context.getImplementationClass(cg);
       sampleCopier.setupCopier(context, sv4, incoming, outgoing);
       return sampleCopier;
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException(e);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
     }
   }
 
@@ -473,16 +468,9 @@
     if (batchQueue != null && batchQueue.size() > 0) {
       VectorContainer vc = batchQueue.poll();
       recordCount = vc.getRecordCount();
-      try {
 
-        // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
-        setupNewSchema(vc);
-      } catch (SchemaChangeException ex) {
-        kill(false);
-        logger.error("Failure during query", ex);
-        context.getExecutorState().fail(ex);
-        return IterOutcome.STOP;
-      }
+      // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
+      setupNewSchema(vc);
       doWork(vc);
       vc.zeroVectors();
       return IterOutcome.OK_NEW_SCHEMA;
@@ -498,24 +486,14 @@
 
     // If this is the first iteration, we need to generate the partition vectors before we can proceed
     if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
-      if (!getPartitionVectors()) {
-        close();
-        return IterOutcome.STOP;
-      }
+      getPartitionVectors();
 
       batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
       first = false;
 
       // Now that we have the partition vectors, we immediately process the first batch on the queue
       VectorContainer vc = batchQueue.poll();
-      try {
-        setupNewSchema(vc);
-      } catch (SchemaChangeException ex) {
-        kill(false);
-        logger.error("Failure during query", ex);
-        context.getExecutorState().fail(ex);
-        return IterOutcome.STOP;
-      }
+      setupNewSchema(vc);
       doWork(vc);
       vc.zeroVectors();
       recordCount = vc.getRecordCount();
@@ -539,14 +517,7 @@
       recordCount = 0;
       return upstream;
     case OK_NEW_SCHEMA:
-      try {
-        setupNewSchema(incoming);
-      } catch (SchemaChangeException ex) {
-        kill(false);
-        logger.error("Failure during query", ex);
-        context.getExecutorState().fail(ex);
-        return IterOutcome.STOP;
-      }
+      setupNewSchema(incoming);
       // fall through.
     case OK:
       doWork(incoming);
@@ -577,7 +548,7 @@
    * @param batch
    * @throws SchemaChangeException
    */
-  protected void setupNewSchema(VectorAccessible batch) throws SchemaChangeException {
+  protected void setupNewSchema(VectorAccessible batch) {
     container.clear();
     ErrorCollector collector = new ErrorCollectorImpl();
     List<TransferPair> transfers = Lists.newArrayList();
@@ -601,9 +572,7 @@
     int count = 0;
     for (Ordering od : popConfig.getOrderings()) {
       LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-      }
+      collector.reportErrors(logger);
       cg.setMappingSet(incomingMapping);
       ClassGenerator.HoldingContainer left = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
       cg.setMappingSet(partitionMapping);
@@ -633,11 +602,11 @@
     container.add(this.partitionKeyVector);
     container.buildSchema(batch.getSchema().getSelectionVectorMode());
 
+    projector = context.getImplementationClass(cg);
     try {
-      this.projector = context.getImplementationClass(cg);
       projector.setup(context, batch, this, transfers, partitionVectors, partitions, popConfig.getRef());
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 6d495a8..18fe9bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.partitionsender;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +26,6 @@
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -43,6 +41,7 @@
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -55,12 +54,15 @@
 
 import com.carrotsearch.hppc.IntArrayList;
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JType;
 
 public class PartitionSenderRootExec extends BaseRootExec {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
+  private static final Logger logger = LoggerFactory.getLogger(PartitionSenderRootExec.class);
   private final RecordBatch incoming;
   private final HashPartitionSender operator;
   private PartitionerDecorator partitioner;
@@ -200,11 +202,6 @@
           logger.error("Error while flushing outgoing batches", e);
           context.getExecutorState().fail(e.getCause());
           return false;
-        } catch (SchemaChangeException e) {
-          incoming.kill(false);
-          logger.error("Error while setting up partitioner", e);
-          context.getExecutorState().fail(e);
-          return false;
         }
       case OK:
         try {
@@ -225,11 +222,11 @@
   }
 
   @VisibleForTesting
-  protected void createPartitioner() throws SchemaChangeException {
+  protected void createPartitioner() {
     createClassInstances(actualPartitions);
   }
 
-  private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {
+  private List<Partitioner> createClassInstances(int actualPartitions) {
     // set up partitioning function
     final LogicalExpression expr = operator.getExpr();
     final ErrorCollector collector = new ErrorCollectorImpl();
@@ -242,11 +239,7 @@
     ClassGenerator<Partitioner> cgInner = cg.getInnerGenerator("OutgoingRecordBatch");
 
     final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry());
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException(String.format(
-          "Failure while trying to materialize incoming schema.  Errors:\n %s.",
-          collector.toErrorString()));
-    }
+    collector.reportErrors(logger);
 
     // generate code to copy from an incoming value vector to the destination partition's outgoing value vector
     JExpression bucket = JExpr.direct("bucket");
@@ -258,47 +251,44 @@
 
     CopyUtil.generateCopies(cgInner, incoming, incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
 
+     // compile and setup generated code
+    List<Partitioner> subPartitioners = context.getImplementationClass(cg, actualPartitions);
+
+    final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
+    final int longTail = outGoingBatchCount % actualPartitions;
+    int startIndex = 0;
+    int endIndex = 0;
+
+    boolean success = false;
     try {
-      // compile and setup generated code
-      List<Partitioner> subPartitioners = context.getImplementationClass(cg, actualPartitions);
-
-      final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
-      final int longTail = outGoingBatchCount % actualPartitions;
-      int startIndex = 0;
-      int endIndex = 0;
-
-      boolean success = false;
-      try {
-        for (int i = 0; i < actualPartitions; i++) {
-          startIndex = endIndex;
-          endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
-          if (i < longTail) {
-            endIndex++;
-          }
-          final OperatorStats partitionStats = new OperatorStats(stats, true);
-          subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
-            cgInner, startIndex, endIndex);
+      for (int i = 0; i < actualPartitions; i++) {
+        startIndex = endIndex;
+        endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
+        if (i < longTail) {
+          endIndex++;
         }
+        final OperatorStats partitionStats = new OperatorStats(stats, true);
+        subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
+          cgInner, startIndex, endIndex);
+      }
 
-        partitioner = new PartitionerDecorator(subPartitioners, stats, context);
-        for (int index = 0; index < terminations.size(); index++) {
-          partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
-        }
-        terminations.clear();
+      partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+      for (int index = 0; index < terminations.size(); index++) {
+        partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+      }
+      terminations.clear();
 
-        success = true;
-      } finally {
-        if (!success) {
-          for (Partitioner p : subPartitioners) {
-            p.clear();
-          }
+      success = true;
+    } catch (SchemaChangeException e) {
+      throw AbstractRecordBatch.schemaChangeException(e, "Partition Sender", logger);
+    } finally {
+      if (!success) {
+        for (Partitioner p : subPartitioners) {
+          p.clear();
         }
       }
-      return subPartitioners;
-
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
+    return subPartitioners;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index c021023..5b2bdc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -19,13 +19,12 @@
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
@@ -135,11 +134,7 @@
           } else if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA && next != EMIT) {
             return next;
           } else if (next == IterOutcome.OK_NEW_SCHEMA) {
-            try {
-              setupNewSchema();
-            } catch (SchemaChangeException e) {
-              throw new RuntimeException(e);
-            }
+            setupNewSchema();
           }
           incomingRecordCount = incoming.getRecordCount();
           memoryManager.update();
@@ -150,9 +145,11 @@
     }
 
     if (complexWriters != null && getLastKnownOutcome() == EMIT) {
-      throw new UnsupportedOperationException("Currently functions producing complex types as output are not " +
-        "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
-        "function in the projection list of outermost query.");
+      throw UserException.unsupportedError()
+          .message("Currently functions producing complex types as output are not " +
+            "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
+            "function in the projection list of outermost query.")
+          .build(logger);
     }
 
     first = false;
@@ -233,7 +230,7 @@
     complexWriters.add(writer);
   }
 
-  public void doAlloc(int recordCount) {
+  private void doAlloc(int recordCount) {
     // Allocate vv in the allocationVectors.
     for (ValueVector v : allocationVectors) {
       AllocationHelper.allocateNew(v, recordCount);
@@ -247,7 +244,7 @@
     }
   }
 
-  public void setValueCount(int count) {
+  private void setValueCount(int count) {
     if (count == 0) {
       container.setEmpty();
       return;
@@ -270,7 +267,7 @@
   }
 
   @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  protected boolean setupNewSchema() {
     setupNewSchemaFromInput(incoming);
     if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
       container.buildSchema(SelectionVectorMode.NONE);
@@ -280,36 +277,38 @@
     }
   }
 
-  private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
+  private void setupNewSchemaFromInput(RecordBatch incomingBatch) {
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     setupNewSchema(incomingBatch, configuredBatchSize);
 
+    ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this,
+        container, callBack, incomingBatch);
+    ProjectionMaterializer em = new ProjectionMaterializer(context.getOptions(),
+        incomingBatch, popConfig.getExprs(), context.getFunctionRegistry(),
+        batchBuilder, unionTypeEnabled);
+    boolean saveCode = false;
+    // Uncomment this line to debug the generated code.
+    // saveCode = true;
+    projector = em.generateProjector(context, saveCode);
     try {
-      ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this,
-          container, callBack, incomingBatch);
-      ProjectionMaterializer em = new ProjectionMaterializer(context.getOptions(),
-          incomingBatch, popConfig.getExprs(), context.getFunctionRegistry(),
-          batchBuilder, unionTypeEnabled);
-      boolean saveCode = false;
-      // Uncomment this line to debug the generated code.
-      // saveCode = true;
-      projector = em.generateProjector(context, saveCode);
       projector.setup(context, incomingBatch, this, batchBuilder.transfers());
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    } catch (SchemaChangeException e) {
+      throw UserException.schemaChangeError(e)
+          .addContext("Unexpected schema change in the Project operator")
+          .build(logger);
     }
   }
 
   /**
-   * Handle Null input specially when Project operator is for query output. This happens when input return 0 batch
-   * (returns a FAST NONE directly).
+   * Handle Null input specially when Project operator is for query output.
+   * This happens when the input returns no batches (returns a FAST {@code NONE} directly).
    *
    * <p>
    * Project operator has to return a batch with schema derived using the following 3 rules:
    * </p>
    * <ul>
-   *  <li>Case 1:  *  ==>  expand into an empty list of columns. </li>
+   *  <li>Case 1:  * ==> expand into an empty list of columns. </li>
    *  <li>Case 2:  regular column reference ==> treat as nullable-int column </li>
    *  <li>Case 3:  expressions => Call ExpressionTreeMaterialization over an empty vector contain.
    *           Once the expression is materialized without error, use the output type of materialized
@@ -318,7 +317,7 @@
    *
    * <p>
    * The batch is constructed with the above rules, and recordCount = 0.
-   * Returned with OK_NEW_SCHEMA to down-stream operator.
+   * Returned with {@code OK_NEW_SCHEMA} to down-stream operator.
    * </p>
    */
   @Override
@@ -331,14 +330,7 @@
     emptyVC.buildSchema(SelectionVectorMode.NONE);
     RecordBatch emptyIncomingBatch = new SimpleRecordBatch(emptyVC, context);
 
-    try {
-      setupNewSchemaFromInput(emptyIncomingBatch);
-    } catch (SchemaChangeException e) {
-      kill(false);
-      logger.error("Failure during query", e);
-      context.getExecutorState().fail(e);
-      return IterOutcome.STOP;
-    }
+    setupNewSchemaFromInput(emptyIncomingBatch);
 
     doAlloc(0);
     container.buildSchema(SelectionVectorMode.NONE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
index 6dcb428..ee011b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -38,8 +37,6 @@
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.DrillFuncHolderExpr;
@@ -134,8 +131,7 @@
     cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, options);
   }
 
-  public Projector generateProjector(FragmentContext context, boolean saveCode)
-      throws ClassTransformationException, IOException, SchemaChangeException {
+  public Projector generateProjector(FragmentContext context, boolean saveCode) {
     long setupNewSchemaStartTime = System.currentTimeMillis();
     setup();
     CodeGenerator<Projector> codeGen = cg.getCodeGenerator();
@@ -149,7 +145,7 @@
     return projector;
   }
 
-  private void setup() throws SchemaChangeException {
+  private void setup() {
     List<NamedExpression> exprs = exprSpec != null ? exprSpec
         : inferExpressions();
     isAnyWildcard = isAnyWildcard(exprs);
@@ -226,8 +222,7 @@
     return needed;
   }
 
-  private void setupExpression(NamedExpression namedExpression)
-      throws SchemaChangeException {
+  private void setupExpression(NamedExpression namedExpression) {
     result.clear();
     if (classify && namedExpression.getExpr() instanceof SchemaPath) {
       classifyExpr(namedExpression, result);
@@ -267,11 +262,7 @@
     LogicalExpression expr = ExpressionTreeMaterializer.materialize(
         namedExpression.getExpr(), incomingBatch, collector,
         functionLookupContext, true, unionTypeEnabled);
-    if (collector.hasErrors()) {
-      throw new SchemaChangeException(String.format(
-          "Failure while trying to materialize incoming schema.  Errors:\n %s.",
-          collector.toErrorString()));
-    }
+    collector.reportErrors(logger);
 
     // Add value vector to transfer if direct reference and this is allowed,
     // otherwise, add to evaluation stack.
@@ -291,8 +282,7 @@
     }
   }
 
-  private void setupImplicitColumnRef(NamedExpression namedExpression)
-      throws SchemaChangeException {
+  private void setupImplicitColumnRef(NamedExpression namedExpression) {
     // The value indicates which wildcard we are processing now
     Integer value = result.prefixMap.get(result.prefix);
     if (value != null && value == 1) {
@@ -335,11 +325,7 @@
 
         LogicalExpression expr = ExpressionTreeMaterializer.materialize(
             originalPath, incomingBatch, collector, functionLookupContext);
-        if (collector.hasErrors()) {
-          throw new SchemaChangeException(String.format(
-              "Failure while trying to materialize incomingBatch schema.  Errors:\n %s.",
-              collector.toErrorString()));
-        }
+        collector.reportErrors(logger);
 
         ValueVectorWriteExpression write = batchBuilder.addOutputVector(name,
             expr);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index a06f1d6..98f6327 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.sort;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
@@ -25,7 +24,6 @@
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -42,17 +40,22 @@
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
 
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
 public class SortBatch extends AbstractRecordBatch<Sort> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatch.class);
+  private static final Logger logger = LoggerFactory.getLogger(SortBatch.class);
 
-  private final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  private final MappingSet mainMapping = new MappingSet((String) null, null,
+            ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  private final MappingSet leftMapping = new MappingSet("leftIndex", null,
+            ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  private final MappingSet rightMapping = new MappingSet("rightIndex", null,
+            ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
 
   private final RecordBatch incoming;
   private final SortRecordBatchBuilder builder;
@@ -97,60 +100,56 @@
       return IterOutcome.NONE;
     }
 
-    try{
-      outer: while (true) {
-        IterOutcome upstream = incoming.next();
-        switch (upstream) {
-        case NONE:
-          break outer;
-        case NOT_YET:
-          throw new UnsupportedOperationException();
-        case STOP:
-          return upstream;
-        case OK_NEW_SCHEMA:
-          // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
-          if (!incoming.getSchema().equals(schema)) {
-            if (schema != null) {
-              throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
-            }
-            this.schema = incoming.getSchema();
+    outer: while (true) {
+      IterOutcome upstream = incoming.next();
+      switch (upstream) {
+      case NONE:
+        break outer;
+      case NOT_YET:
+        throw new UnsupportedOperationException();
+      case STOP:
+        return upstream;
+      case OK_NEW_SCHEMA:
+        // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
+        if (!incoming.getSchema().equals(schema)) {
+          if (schema != null) {
+            throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
           }
-          // fall through.
-        case OK:
-          if (!builder.add(incoming)) {
-            throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort.");
-          }
-          break;
-        default:
-          throw new UnsupportedOperationException();
+          schema = incoming.getSchema();
         }
+        // fall through.
+      case OK:
+        if (!builder.add(incoming)) {
+          throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort.");
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException();
       }
-
-      if (schema == null || builder.isEmpty()) {
-        // builder may be null at this point if the first incoming batch is empty
-        return IterOutcome.NONE;
-      }
-
-      builder.build(container);
-      sorter = createNewSorter();
-      sorter.setup(context, getSelectionVector4(), this.container);
-      sorter.sort(getSelectionVector4(), this.container);
-
-      return IterOutcome.OK_NEW_SCHEMA;
-
-    } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
-      kill(false);
-      logger.error("Failure during query", ex);
-      context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
     }
+
+    if (schema == null || builder.isEmpty()) {
+      // builder may be null at this point if the first incoming batch is empty
+      return IterOutcome.NONE;
+    }
+
+    builder.build(container);
+    sorter = createNewSorter();
+    try {
+      sorter.setup(context, getSelectionVector4(), this.container);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
+    }
+    sorter.sort(getSelectionVector4(), this.container);
+
+    return IterOutcome.OK_NEW_SCHEMA;
   }
 
-  private Sorter createNewSorter() throws ClassTransformationException, IOException, SchemaChangeException {
+  private Sorter createNewSorter() {
     return createNewSorter(this.context, this.popConfig.getOrderings(), this, mainMapping, leftMapping, rightMapping);
   }
 
-  public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
+  public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch) {
     final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
     final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
     final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
@@ -158,8 +157,8 @@
     return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping);
   }
 
-  public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
-          throws ClassTransformationException, IOException, SchemaChangeException{
+  public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings,
+      VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
     CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getOptions());
     // This operator may be deprecated. No tests exercise it.
     // There is no way, at present, to verify if the generated code
@@ -174,9 +173,7 @@
       // first, we rewrite the evaluation stack for each side of the comparison.
       ErrorCollector collector = new ErrorCollectorImpl();
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-      }
+      collector.reportErrors(logger);
       g.setMappingSet(leftMapping);
       HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
       g.setMappingSet(rightMapping);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index ce8a3aa..94770b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -23,10 +23,12 @@
 import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.AllocationReservation;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
@@ -39,16 +41,18 @@
 
 import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SortRecordBatchBuilder implements AutoCloseable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
+  static final Logger logger = LoggerFactory.getLogger(SortRecordBatchBuilder.class);
 
   private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
 
   private int recordCount;
   private long runningBatches;
   private SelectionVector4 sv4;
-  private BufferAllocator allocator;
+  private final BufferAllocator allocator;
   final AllocationReservation reservation;
 
   public SortRecordBatchBuilder(BufferAllocator a) {
@@ -134,13 +138,17 @@
     return batches.isEmpty();
   }
 
-  public void build(VectorContainer outputContainer) throws SchemaChangeException {
+  public void build(VectorContainer outputContainer) {
     outputContainer.clear();
     if (batches.keySet().size() > 1) {
-      throw new SchemaChangeException("Sort currently only supports a single schema.");
+      throw UserException.validationError(null)
+          .message("Sort currently only supports a single schema.")
+          .build(logger);
     }
     if (batches.size() > Character.MAX_VALUE) {
-      throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+      throw UserException.internalError(null)
+      .message("Sort cannot work on more than %d batches at a time.", Character.MAX_VALUE)
+      .build(logger);
     }
     if (batches.keys().size() < 1) {
       assert false : "Invalid to have an empty set of batches with no schemas.";
@@ -150,7 +158,11 @@
     if (svBuffer == null) {
       throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
     }
-    sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
+    try {
+      sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
+    } catch (SchemaChangeException e) {
+      throw AbstractRecordBatch.schemaChangeException(e, "Sort", logger);
+    }
     BatchSchema schema = batches.keySet().iterator().next();
     List<RecordBatchData> data = batches.get(schema);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
index 22cc24f..f33d9d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -24,9 +24,10 @@
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public interface Sorter {
-  public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException;
+  public void setup(FragmentContext context, SelectionVector4 vector4,
+      VectorContainer hyperBatch) throws SchemaChangeException;
   public void sort(SelectionVector4 vector4, VectorContainer container);
 
-  public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>(Sorter.class, SortTemplate.class);
-
+  public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION =
+      new TemplateClassDefinition<Sorter>(Sorter.class, SortTemplate.class);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsAggBatch.java
index c418933..36de4df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsAggBatch.java
@@ -17,15 +17,16 @@
  */
 package org.apache.drill.exec.physical.impl.statistics;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.sun.codemodel.JExpr;
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FunctionCallFactory;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -50,6 +51,8 @@
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.metastore.statistics.Statistic;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /*
  * TODO: This needs cleanup. Currently the key values are constants and we compare the constants
@@ -75,10 +78,12 @@
  */
 
 public class StatisticsAggBatch extends StreamingAggBatch {
+  private static final Logger logger = LoggerFactory.getLogger(StatisticsAggBatch.class);
+
   // List of statistics functions e.g. rowcount, ndv output by StatisticsAggBatch
-  private List<String> functions;
+  private final List<String> functions;
   // List of implicit columns for which we do NOT want to compute statistics
-  private Map<String, ColumnExplorer.ImplicitFileColumns> implicitFileColumnsMap;
+  private final Map<String, ColumnExplorer.ImplicitFileColumns> implicitFileColumnsMap;
 
   public StatisticsAggBatch(StatisticsAggregate popConfig, RecordBatch incoming,
       FragmentContext context) throws OutOfMemoryException {
@@ -118,8 +123,7 @@
    * Creates the key column within the parent value vector
    */
   private void createNestedKeyColumn(MapVector parent, String name, LogicalExpression expr,
-      List<LogicalExpression> keyExprs, List<TypedFieldId> keyOutputIds)
-          throws SchemaChangeException {
+      List<LogicalExpression> keyExprs, List<TypedFieldId> keyOutputIds) {
     LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, incoming, context);
     TypedFieldId id = createVVFieldId(mle, name, parent);
     keyExprs.add(mle);
@@ -131,7 +135,7 @@
    * is the column name and value is the statistic expression e.g. "salary" : NDV(emp.salary)
    */
   private void addMapVector(String name, MapVector parent, LogicalExpression expr,
-      List<LogicalExpression> valueExprs) throws SchemaChangeException {
+      List<LogicalExpression> valueExprs) {
     LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, incoming, context);
     TypedFieldId id = createVVFieldId(mle, name, parent);
     valueExprs.add(new ValueVectorWriteExpression(id, mle, true));
@@ -141,8 +145,7 @@
    * Generates the code for the statistics aggregate which is subclassed from StreamingAggregator
    */
   private StreamingAggregator codegenAggregator(List<LogicalExpression> keyExprs,
-      List<LogicalExpression> valueExprs, List<TypedFieldId> keyOutputIds)
-          throws SchemaChangeException, ClassTransformationException, IOException {
+      List<LogicalExpression> valueExprs, List<TypedFieldId> keyOutputIds) {
 
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
@@ -168,13 +171,16 @@
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(oContext, incoming, this, ValueVector.MAX_ROW_COUNT);
+    try {
+      agg.setup(oContext, incoming, this, ValueVector.MAX_ROW_COUNT);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
+    }
     return agg;
   }
 
   @Override
-  protected StreamingAggregator createAggregatorInternal()
-      throws SchemaChangeException, ClassTransformationException, IOException {
+  protected StreamingAggregator createAggregatorInternal() {
     List<LogicalExpression> keyExprs = Lists.newArrayList();
     List<LogicalExpression> valueExprs = Lists.newArrayList();
     List<TypedFieldId> keyOutputIds = Lists.newArrayList();
@@ -190,7 +196,13 @@
         if (col.equals(colMeta[0])) {
           expr = ValueExpressions.getChar(SchemaPath.getSimplePath(mf.getName()).toString(), 0);
         } else {
-          expr = ValueExpressions.getChar(DrillStatsTable.getMapper().writeValueAsString(mf.getType()), 0);
+          try {
+            expr = ValueExpressions.getChar(DrillStatsTable.getMapper().writeValueAsString(mf.getType()), 0);
+          } catch (JsonProcessingException e) {
+            throw UserException.dataWriteError(e)
+                .addContext("Failed to write statistics to JSON")
+                .build();
+          }
         }
         // Ignore implicit columns
         if (!isImplicitFileOrPartitionColumn(mf, incoming.getContext().getOptions())) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 35f34b0..63bcb86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.union;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -33,7 +32,6 @@
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -92,7 +90,7 @@
   }
 
   @Override
-  protected void buildSchema() throws SchemaChangeException {
+  protected void buildSchema() {
     if (! prefetchFirstBatchFromBothSides()) {
       state = BatchState.DONE;
       return;
@@ -117,37 +115,30 @@
 
   @Override
   public IterOutcome innerNext() {
-    try {
-      while (true) {
-        if (!unionInputIterator.hasNext()) {
-          return IterOutcome.NONE;
-        }
-
-        Pair<IterOutcome, BatchStatusWrappper> nextBatch = unionInputIterator.next();
-        IterOutcome upstream = nextBatch.left;
-        BatchStatusWrappper batchStatus = nextBatch.right;
-
-        switch (upstream) {
-        case NONE:
-        case STOP:
-          return upstream;
-        case OK_NEW_SCHEMA:
-          return doWork(batchStatus, true);
-        case OK:
-          // skip batches with same schema as the previous one yet having 0 row.
-          if (batchStatus.batch.getRecordCount() == 0) {
-            VectorAccessibleUtilities.clear(batchStatus.batch);
-            continue;
-          }
-          return doWork(batchStatus, false);
-        default:
-          throw new IllegalStateException(String.format("Unknown state %s.", upstream));
-        }
+    while (true) {
+      if (!unionInputIterator.hasNext()) {
+        return IterOutcome.NONE;
       }
-    } catch (ClassTransformationException | IOException | SchemaChangeException ex) {
-      context.getExecutorState().fail(ex);
-      killIncoming(false);
-      return IterOutcome.STOP;
+
+      Pair<IterOutcome, BatchStatusWrappper> nextBatch = unionInputIterator.next();
+      IterOutcome upstream = nextBatch.left;
+      BatchStatusWrappper batchStatus = nextBatch.right;
+
+      switch (upstream) {
+      case NONE:
+        return upstream;
+      case OK_NEW_SCHEMA:
+        return doWork(batchStatus, true);
+      case OK:
+        // skip batches with same schema as the previous one yet having 0 row.
+        if (batchStatus.batch.getRecordCount() == 0) {
+          VectorAccessibleUtilities.clear(batchStatus.batch);
+          continue;
+        }
+        return doWork(batchStatus, false);
+      default:
+        throw new IllegalStateException(String.format("Unknown state %s.", upstream));
+      }
     }
   }
 
@@ -156,8 +147,7 @@
     return recordCount;
   }
 
-  private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema)
-      throws ClassTransformationException, IOException, SchemaChangeException {
+  private IterOutcome doWork(BatchStatusWrappper batchStatus, boolean newSchema) {
     Preconditions.checkArgument(batchStatus.batch.getSchema().getFieldCount() == container.getSchema().getFieldCount(),
         "Input batch and output batch have different field counthas!");
 
@@ -187,7 +177,7 @@
     }
   }
 
-  private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException {
+  private void createUnionAller(RecordBatch inputBatch) {
     transfers.clear();
     allocationVectors.clear();
 
@@ -219,28 +209,21 @@
         MaterializedField outputField = vvOut.getField();
 
         LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch, collector, context.getFunctionRegistry());
-
-        if (collector.hasErrors()) {
-          throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-        }
+        collector.reportErrors(logger);
 
         // If the inputs' DataMode is required and the outputs' DataMode is not required
         // cast to the one with the least restriction
         if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
             && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED) {
           expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(), context.getFunctionRegistry(), collector);
-          if (collector.hasErrors()) {
-            throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-          }
+          collector.reportErrors(logger);
         }
 
         // If two inputs' MinorTypes are different,
         // Insert a cast before the Union operation
         if(inField.getType().getMinorType() != outputField.getType().getMinorType()) {
           expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(), context.getFunctionRegistry(), collector);
-          if (collector.hasErrors()) {
-            throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-          }
+          collector.reportErrors(logger);
         }
 
         TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
@@ -255,7 +238,11 @@
     }
 
     unionall = context.getImplementationClass(cg.getCodeGenerator());
-    unionall.setup(context, inputBatch, this, transfers);
+    try {
+      unionall.setup(context, inputBatch, this, transfers);
+    } catch (SchemaChangeException e) {
+      throw schemaChangeException(e, logger);
+    }
   }
 
   // The output table's column names always follow the left table,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BaseWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BaseWrapper.java
index 0ff67c2..db21972 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BaseWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BaseWrapper.java
@@ -17,36 +17,22 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import java.io.IOException;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.slf4j.Logger;
 
 /**
  * Base class for code-generation-based tasks.
  */
-
 public abstract class BaseWrapper {
 
-  protected OperatorContext context;
+  protected final OperatorContext context;
 
   public BaseWrapper(OperatorContext context) {
     this.context = context;
   }
 
-  protected <T> T getInstance(CodeGenerator<T> cg, org.slf4j.Logger logger) {
-    try {
-      return context.getFragmentContext().getImplementationClass(cg);
-    } catch (ClassTransformationException e) {
-      throw UserException.unsupportedError(e)
-            .message("Code generation error - likely code error.")
-            .build(logger);
-    } catch (IOException e) {
-      throw UserException.resourceError(e)
-            .message("IO Error during code generation.")
-            .build(logger);
-    }
+  protected <T> T getInstance(CodeGenerator<T> cg, Logger logger) {
+    return context.getFragmentContext().getImplementationClass(cg);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 94ee022..4a83046 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -20,6 +20,7 @@
 import java.util.Iterator;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -273,4 +274,15 @@
   public void checkContinue() {
     context.getExecutorState().checkContinue();
   }
+
+  protected UserException schemaChangeException(SchemaChangeException e, Logger logger) {
+    return schemaChangeException(e, getClass().getSimpleName(), logger);
+  }
+
+  public static UserException schemaChangeException(SchemaChangeException e,
+      String operator, Logger logger) {
+    return UserException.schemaChangeError(e)
+      .addContext("Unexpected schema change in %s operator", operator)
+      .build(logger);
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java b/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java
index 7594fb7..1da3c9f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java
@@ -20,13 +20,13 @@
 import org.junit.Test;
 
 public class TestCTASJson extends PlanTestBase {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestCTASJson.class);
 
-  @Test
   /**
-   * Test a source json file that contains records that are maps with fields of all types.
-   * Some records have missing fields. CTAS should skip the missing fields
+   * Test a source json file that contains records that are maps with fields of
+   * all types. Some records have missing fields. CTAS should skip the missing
+   * fields
    */
+  @Test
   public void testctas_alltypes_map() throws Exception {
     String testName = "ctas_alltypes_map";
     test("use dfs.tmp");
@@ -39,7 +39,7 @@
           .sqlQuery(query)
           .ordered()
           .jsonBaselineFile("json/" + testName + ".json")
-          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json' ")
+          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json'")
           .optionSettingQueriesForTestQuery("alter session set store.json.writer.skip_null_fields = true") // DEFAULT
           .build()
           .run();
@@ -50,11 +50,12 @@
     }
   }
 
-  @Test
   /**
-   * Test a source json file that contains records that are maps with fields of all types.
-   * Some records have missing fields. CTAS should NOT skip the missing fields
+   * Test a source json file that contains records that are maps with fields of
+   * all types. Some records have missing fields. CTAS should NOT skip the
+   * missing fields
    */
+  @Test
   public void testctas_alltypes_map_noskip() throws Exception {
     String testName = "ctas_alltypes_map";
     test("use dfs.tmp");
@@ -67,7 +68,7 @@
           .sqlQuery(query)
           .ordered()
           .jsonBaselineFile("json/" + testName + "_out.json")
-          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json' ")
+          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json'")
           .optionSettingQueriesForTestQuery("alter session set store.json.writer.skip_null_fields = false") // change from DEFAULT
           .build()
           .run();
@@ -78,11 +79,12 @@
     }
   }
 
-  @Test
   /**
-   * Test a source json file that contains records that are maps with fields of all types.
-   * Some records have missing fields. CTAS should skip the missing fields
+   * Test a source json file that contains records that are maps with fields of
+   * all types. Some records have missing fields. CTAS should skip the missing
+   * fields
    */
+  @Test
   public void testctas_alltypes_repeatedmap() throws Exception {
     String testName = "ctas_alltypes_repeated_map";
     test("use dfs.tmp");
@@ -95,7 +97,7 @@
           .sqlQuery(query)
           .ordered()
           .jsonBaselineFile("json/" + testName + ".json")
-          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json' ")
+          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json'")
           .optionSettingQueriesForTestQuery(
               "alter session set store.json.writer.skip_null_fields = true") // DEFAULT
           .build()
@@ -107,11 +109,12 @@
     }
   }
 
-  @Test
   /**
-   * Test a source json file that contains records that are maps with fields of all types.
-   * Some records have missing fields. CTAS should NOT skip the missing fields
+   * Test a source json file that contains records that are maps with fields of
+   * all types. Some records have missing fields. CTAS should NOT skip the
+   * missing fields
    */
+  @Test
   public void testctas_alltypes_repeated_map_noskip() throws Exception {
     String testName = "ctas_alltypes_repeated_map";
     test("use dfs.tmp");
@@ -124,7 +127,7 @@
           .sqlQuery(query)
           .ordered()
           .jsonBaselineFile("json/" + testName + "_out.json")
-          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json' ")
+          .optionSettingQueriesForTestQuery("alter session set `store.format` = 'json'")
           .optionSettingQueriesForTestQuery(
               "alter session set store.json.writer.skip_null_fields = false") // change from DEFAULT
           .build()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index d8e3b2b..2fc9158 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -79,7 +79,6 @@
     this.allOutcomes = iterOutcomes;
   }
 
-  @Deprecated
   public MockRecordBatch(@Nullable FragmentContext context,
                          @Nullable OperatorContext oContext,
                          @NotNull List<VectorContainer> testContainers,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 0db2d3d..0e64600 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -20,15 +20,15 @@
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
@@ -2462,9 +2462,10 @@
     try {
       ljBatch.next();
       fail();
+    } catch (UserException e) {
+      assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType());
     } catch (AssertionError | Exception error) {
-      assertTrue(error instanceof DrillRuntimeException);
-      assertTrue(error.getCause() instanceof SchemaChangeException);
+      fail();
     } finally {
       // Close all the resources for this test case
       ljBatch.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
index 23e6d4a..8580a37 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -18,11 +18,13 @@
 package org.apache.drill.exec.physical.impl.project;
 
 import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -34,8 +36,10 @@
 public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
-   * Test that if empty input batch is received with OK_NEW_SCHEMA or EMIT outcome, then Project doesn't ignores
-   * these empty batches and instead return them downstream with correct outcomes.
+   * Test that if empty input batch is received with OK_NEW_SCHEMA or EMIT
+   * outcome, then Project doesn't ignores these empty batches and instead
+   * return them downstream with correct outcomes.
+   *
    * @throws Throwable
    */
   @Test
@@ -49,19 +53,22 @@
       inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
 
     final Project projectConf = new Project(parseExprs("id_left+5", "id_left"), null);
-    final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
-      operatorFixture.getFragmentContext());
+    try (final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
+      operatorFixture.getFragmentContext());) {
 
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    outputRecordCount += projectBatch.getRecordCount();
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
-    outputRecordCount += projectBatch.getRecordCount();
-    assertEquals(0, outputRecordCount);
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+      outputRecordCount += projectBatch.getRecordCount();
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+      outputRecordCount += projectBatch.getRecordCount();
+      assertEquals(0, outputRecordCount);
+    }
   }
 
   /**
-   * Test to show if a non-empty batch is accompanied with EMIT outcome then Project operator produces output for
-   * that batch and return the output using EMIT outcome.
+   * Test to show if a non-empty batch is accompanied with EMIT outcome then
+   * Project operator produces output for that batch and return the output using
+   * EMIT outcome.
+   *
    * @throws Throwable
    */
   @Test
@@ -75,14 +82,15 @@
       inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
 
     final Project projectConf = new Project(parseExprs("id_left+5", "id_left"), null);
-    final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
-      operatorFixture.getFragmentContext());
+    try (final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
+        operatorFixture.getFragmentContext());) {
 
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    outputRecordCount += projectBatch.getRecordCount();
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
-    outputRecordCount += projectBatch.getRecordCount();
-    assertEquals(1, outputRecordCount);
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+      outputRecordCount += projectBatch.getRecordCount();
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+      outputRecordCount += projectBatch.getRecordCount();
+      assertEquals(1, outputRecordCount);
+    }
   }
 
   /**
@@ -101,14 +109,15 @@
       inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
 
     final Project projectConf = new Project(parseExprs("id_left+5", "id_left"), null);
-    final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
-      operatorFixture.getFragmentContext());
+    try (final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
+          operatorFixture.getFragmentContext());) {
 
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    outputRecordCount += projectBatch.getRecordCount();
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
-    outputRecordCount += projectBatch.getRecordCount();
-    assertEquals(1, outputRecordCount);
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+      outputRecordCount += projectBatch.getRecordCount();
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+      outputRecordCount += projectBatch.getRecordCount();
+      assertEquals(1, outputRecordCount);
+    }
   }
 
   /**
@@ -134,16 +143,17 @@
       inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
 
     final Project projectConf = new Project(parseExprs("id_left+5", "id_left"), null);
-    final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
-      operatorFixture.getFragmentContext());
+    try (final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
+            operatorFixture.getFragmentContext());) {
 
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    outputRecordCount += projectBatch.getRecordCount();
-    // OK will not be received since it's was accompanied with empty batch
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
-    outputRecordCount += projectBatch.getRecordCount();
-    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.NONE);
-    assertEquals(1, outputRecordCount);
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+      outputRecordCount += projectBatch.getRecordCount();
+      // OK will not be received since it's was accompanied with empty batch
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+      outputRecordCount += projectBatch.getRecordCount();
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.NONE);
+      assertEquals(1, outputRecordCount);
+    }
   }
 
   /**
@@ -170,16 +180,15 @@
       inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
 
     final Project projectConf = new Project(parseExprs("convert_fromJSON(name_left)", "name_columns"), null);
-    final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
-      operatorFixture.getFragmentContext());
+    try (final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
+            operatorFixture.getFragmentContext());) {
 
-    try {
-      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+      assertEquals(RecordBatch.IterOutcome.OK_NEW_SCHEMA, projectBatch.next());
+      projectBatch.next(); // Fails
       fail();
-    } catch (Exception e) {
+    } catch (UserException e) {
       // exception is expected because of complex writers case
-      assertTrue(e instanceof UnsupportedOperationException);
+      assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType());
     }
   }
 
@@ -209,15 +218,14 @@
       inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
 
     final Project projectConf = new Project(parseExprs("convert_fromJSON(name_left)", "name_columns"), null);
-    final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
-      operatorFixture.getFragmentContext());
+    try (final ProjectRecordBatch projectBatch = new ProjectRecordBatch(projectConf, mockInputBatch,
+            operatorFixture.getFragmentContext());) {
 
-    try {
-      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+      projectBatch.next(); // Fails
       fail();
-    } catch (Exception e) {
+    } catch (UserException e) {
       // exception is expected because of complex writers case
-      assertTrue(e instanceof UnsupportedOperationException);
+      assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType());
     }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortImpl.java
index ae7196a..bc9f3a1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortImpl.java
@@ -65,7 +65,6 @@
  * Tests the external sort implementation: the "guts" of the sort stripped of the
  * Volcano-protocol layer. Assumes the individual components are already tested.
  */
-
 @Category(OperatorTest.class)
 public class TestSortImpl extends DrillTest {
 
@@ -80,10 +79,8 @@
    * @param fixture operator fixture
    * @param sortOrder sort order as specified by {@link Ordering}
    * @param nullOrder null order as specified by {@link Ordering}
-   * @return the sort initialized sort implementation, ready to
-   * do work
+   * @return the initialized sort implementation, ready to do work
    */
-
   public static SortImpl makeSortImpl(OperatorFixture fixture,
                                String sortOrder, String nullOrder) {
     FieldReference expr = FieldReference.getWithQuotedRef("key");
@@ -114,7 +111,6 @@
    * harvests the output. Subclasses define the specifics of the sort,
    * define the input data, and validate the output data.
    */
-
   public static class SortTestFixture {
     private final OperatorFixture fixture;
     private final List<RowSet> inputSets = new ArrayList<>();
@@ -193,9 +189,7 @@
    *
    * @param results sort results iterator
    * @param dest container that holds the sort results
-   * @return
    */
-
   private static RowSet toRowSet(SortResults results, VectorContainer dest) {
     if (results.getSv4() != null) {
       return HyperRowSetImpl.fromContainer(dest, results.getSv4());
@@ -209,9 +203,7 @@
   /**
    * Test for null input (no input batches). Note that, in this case,
    * we never see a schema.
-   * @throws Exception
    */
-
   @Test
   public void testNullInput() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
@@ -222,9 +214,7 @@
 
   /**
    * Test for an input with a schema, but only an empty input batch.
-   * @throws Exception
    */
-
   @Test
   public void testEmptyInput() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
@@ -238,9 +228,7 @@
 
   /**
    * Degenerate case: single row in single batch.
-   * @throws Exception
    */
-
   @Test
   public void testSingleRow() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
@@ -258,9 +246,7 @@
 
   /**
    * Degenerate case: two (unsorted) rows in single batch
-   * @throws Exception
    */
-
   @Test
   public void testSingleBatch() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
@@ -279,11 +265,8 @@
   }
 
   /**
-   * Degenerate case, one row in each of two
-   * (unsorted) batches.
-   * @throws Exception
+   * Degenerate case, one row in each of two (unsorted) batches.
    */
-
   @Test
   public void testTwoBatches() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
@@ -310,7 +293,6 @@
    * This ensures we visit each value twice, and that the sorted output will
    * be a continuous run of numbers in proper order.
    */
-
   public static class DataGenerator {
     private final OperatorFixture fixture;
     private final TupleMetadata schema;
@@ -340,7 +322,6 @@
      * @param target number of rows to generate
      * @return the prime step size
      */
-
     private static int guessStep(int target) {
       if (target < 10) {
         return 7;
@@ -374,7 +355,6 @@
    * Validate a sort output batch based on the expectation that the key
    * is an ordered sequence of integers, split across multiple batches.
    */
-
   public static class DataValidator {
     private final int targetCount;
     private final int batchSize;
@@ -417,7 +397,6 @@
    * @param dataGen input batch generator
    * @param validator validates output batches
    */
-
   public void runLargeSortTest(OperatorFixture fixture, DataGenerator dataGen,
                                DataValidator validator) {
     SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
@@ -460,7 +439,6 @@
    * @param fixture operator test fixture
    * @param rowCount number of rows to test
    */
-
   public void runJumboBatchTest(OperatorFixture fixture, int rowCount) {
     DataGenerator dataGen = new DataGenerator(fixture, rowCount, ValueVector.MAX_ROW_COUNT);
     DataValidator validator = new DataValidator(rowCount, ValueVector.MAX_ROW_COUNT);
@@ -470,8 +448,6 @@
   /**
    * Most tests have used small row counts because we want to probe specific bits
    * of interest. Try 1000 rows just to ensure things work
-   *
-   * @throws Exception
    */
   @Test
   public void testModerateBatch() throws Exception {
@@ -483,14 +459,10 @@
   /**
    * Hit the sort with the largest possible batch size to ensure nothing is lost
    * at the edges.
-   *
-   * @throws Exception
    */
-
   @Test
   public void testLargeBatch() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
-//      partyOnMemory(fixture.allocator());
       runJumboBatchTest(fixture, ValueVector.MAX_ROW_COUNT);
     }
   }
@@ -502,7 +474,6 @@
    *
    * @param allocator - used for allocating Drillbuf
    */
-
   @SuppressWarnings("unused")
   private void partyOnMemory(BufferAllocator allocator) {
     DrillBuf bufs[] = new DrillBuf[10];
@@ -526,7 +497,6 @@
    * @param colCount number of data (non-key) columns
    * @param rowCount number of rows to generate
    */
-
   public void runWideRowsTest(OperatorFixture fixture, int colCount, int rowCount) {
     SchemaBuilder builder = new SchemaBuilder()
         .add("key", MinorType.INT);
@@ -563,10 +533,7 @@
 
   /**
    * Test wide rows with the stock copier.
-   *
-   * @throws Exception
    */
-
   @Test
   public void testWideRows() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture(dirTestWatcher)) {
@@ -583,10 +550,7 @@
    * mechanism itself (that has also already been tested.) Rather it is
    * to ensure that, when those components are integrated into the
    * sort implementation, that the whole assembly does the right thing.
-   *
-   * @throws Exception
    */
-
   @Test
   public void testSpill() throws Exception {
     OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index e00fc93..5caaaa1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -25,6 +25,7 @@
 
 import org.apache.drill.categories.VectorTest;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -48,6 +49,7 @@
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.Range;
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
 
 @Category(VectorTest.class)
 public class ExpressionTreeMaterializerTest extends ExecTest {
@@ -189,6 +191,13 @@
       public int getErrorCount() {
         return errorCount;
       }
+
+      @Override
+      public void reportErrors(Logger logger) {
+        throw UserException.internalError(null)
+          .message("Code generation found %d errors", errorCount)
+          .build(logger);
+      }
     };
 
     LogicalExpression functionCallExpr = new FunctionCall("testFunc",
diff --git a/logical/src/main/java/org/apache/drill/common/expression/ErrorCollector.java b/logical/src/main/java/org/apache/drill/common/expression/ErrorCollector.java
index 7ca8bed..21429ac 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/ErrorCollector.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/ErrorCollector.java
@@ -20,26 +20,39 @@
 import org.apache.drill.common.types.TypeProtos.MajorType;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Range;
+import org.slf4j.Logger;
 
 public interface ErrorCollector {
 
-    public void addGeneralError(ExpressionPosition expr, String s);
+  public void addGeneralError(ExpressionPosition expr, String s);
 
-    public void addUnexpectedArgumentType(ExpressionPosition expr, String name, MajorType actual, MajorType[] expected, int argumentIndex);
+  public void addUnexpectedArgumentType(ExpressionPosition expr, String name,
+      MajorType actual, MajorType[] expected, int argumentIndex);
 
-    public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, Range<Integer> expected);
+  public void addUnexpectedArgumentCount(ExpressionPosition expr,
+      int actual,  Range<Integer> expected);
 
-    public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, int expected);
+  public void addUnexpectedArgumentCount(ExpressionPosition expr,
+      int actual, int expected);
 
-    public void addNonNumericType(ExpressionPosition expr, MajorType actual);
+  public void addNonNumericType(ExpressionPosition expr, MajorType actual);
 
-    public void addUnexpectedType(ExpressionPosition expr, int index, MajorType actual);
+  public void addUnexpectedType(ExpressionPosition expr, int index, MajorType actual);
 
-    public void addExpectedConstantValue(ExpressionPosition expr, int actual, String s);
+  public void addExpectedConstantValue(ExpressionPosition expr, int actual, String s);
 
-    boolean hasErrors();
+  boolean hasErrors();
 
-    public int getErrorCount();
+  public int getErrorCount();
 
-    String toErrorString();
+  String toErrorString();
+
+  /**
+   * Checks for errors and throws a user exception if any are found.
+   * The caller thus need not implement its own error checking; just
+   * call this method.
+   *
+   * @param logger
+   */
+  void reportErrors(Logger logger);
 }
diff --git a/logical/src/main/java/org/apache/drill/common/expression/ErrorCollectorImpl.java b/logical/src/main/java/org/apache/drill/common/expression/ErrorCollectorImpl.java
index 18a0e89..383bd8d 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/ErrorCollectorImpl.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/ErrorCollectorImpl.java
@@ -17,99 +17,115 @@
  */
 package org.apache.drill.common.expression;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.visitors.ExpressionValidationError;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Range;
+import org.slf4j.Logger;
 
 public class ErrorCollectorImpl implements ErrorCollector {
-    List<ExpressionValidationError> errors;
+  private final List<ExpressionValidationError> errors = new ArrayList<>();
 
-    public ErrorCollectorImpl() {
-        errors = Lists.newArrayList();
+  private String addExpr(ExpressionPosition expr, String message) {
+    return String.format(
+        "Error in expression at index %d.  Error: %s. Full expression: %s.",
+        expr.getCharIndex(), message, expr.getExpression());
+  }
+
+  @Override
+  public void addGeneralError(ExpressionPosition expr, String s) {
+    errors.add(new ExpressionValidationError(addExpr(expr, s)));
+  }
+
+  @Override
+  public void addUnexpectedArgumentType(ExpressionPosition expr, String name,
+      MajorType actual, MajorType[] expected, int argumentIndex) {
+    errors.add(
+        new ExpressionValidationError(
+            addExpr(expr, String.format(
+                "Unexpected argument type. Index :%d Name: %s, Type: %s, Expected type(s): %s",
+                argumentIndex, name, actual, Arrays.toString(expected)
+            ))
+        )
+    );
+  }
+
+  @Override
+  public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, Range<Integer> expected) {
+    errors.add(new ExpressionValidationError(
+            addExpr(expr, String.format(
+                "Unexpected argument count. Actual argument count: %d, Expected range: %s",
+                actual, expected))
+    ));
+  }
+
+  @Override
+  public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, int expected) {
+    errors.add(new ExpressionValidationError(
+        addExpr(expr, String.format(
+            "Unexpected argument count. Actual argument count: %d, Expected count: %d",
+            actual, expected))
+    ));
+  }
+
+  @Override
+  public void addNonNumericType(ExpressionPosition expr, MajorType actual) {
+    errors.add(new ExpressionValidationError(
+        addExpr(expr, String.format(
+            "Unexpected numeric type. Actual type: %s", actual))
+    ));
+  }
+
+  @Override
+  public void addUnexpectedType(ExpressionPosition expr, int index, MajorType actual) {
+    errors.add(new ExpressionValidationError(
+        addExpr(expr, String.format(
+            "Unexpected argument type. Actual type: %s, Index: %d", actual, index))
+    ));
+  }
+
+  @Override
+  public void addExpectedConstantValue(ExpressionPosition expr, int actual, String s) {
+    errors.add(new ExpressionValidationError(
+        addExpr(expr, String.format(
+            "Unexpected constant value. Name: %s, Actual: %s", s, actual))
+    ));
+  }
+
+  @Override
+  public boolean hasErrors() {
+      return !errors.isEmpty();
+  }
+
+  @Override
+  public int getErrorCount() {
+    return errors.size();
+  }
+
+  @Override
+  public String toErrorString() {
+    return "\n" + Joiner.on("\n").join(errors);
+  }
+
+  @Override
+  public String toString() {
+    return toErrorString();
+  }
+
+  @Override
+  public void reportErrors(Logger logger) {
+    if (!hasErrors()) {
+      return;
     }
-
-    private String addExpr(ExpressionPosition expr, String message) {
-        return String.format("Error in expression at index %d.  Error: %s.  Full expression: %s.", expr.getCharIndex(), message, expr.getExpression());
-    }
-
-    @Override
-    public void addGeneralError(ExpressionPosition expr, String s) {
-        errors.add(new ExpressionValidationError(addExpr(expr, s)));
-    }
-
-    @Override
-    public void addUnexpectedArgumentType(ExpressionPosition expr, String name, MajorType actual, MajorType[] expected, int argumentIndex) {
-        errors.add(
-                new ExpressionValidationError(
-                        addExpr(expr, String.format(
-                                "Unexpected argument type. Index :%d Name: %s, Type: %s, Expected type(s): %s",
-                                argumentIndex, name, actual, Arrays.toString(expected)
-                        ))
-                )
-        );
-    }
-
-    @Override
-    public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, Range<Integer> expected) {
-        errors.add(new ExpressionValidationError(
-                addExpr(expr, String.format("Unexpected argument count. Actual argument count: %d, Expected range: %s", actual, expected))
-        ));
-    }
-
-    @Override
-    public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, int expected) {
-        errors.add(new ExpressionValidationError(
-                addExpr(expr, String.format("Unexpected argument count. Actual argument count: %d, Expected count: %d", actual, expected))
-        ));
-    }
-
-    @Override
-    public void addNonNumericType(ExpressionPosition expr, MajorType actual) {
-        errors.add(new ExpressionValidationError(
-                addExpr(expr, String.format("Unexpected numeric type. Actual type: %s", actual))
-        ));
-    }
-
-    @Override
-    public void addUnexpectedType(ExpressionPosition expr, int index, MajorType actual) {
-        errors.add(new ExpressionValidationError(
-                addExpr(expr, String.format("Unexpected argument type. Actual type: %s, Index: %d", actual, index))
-        ));
-    }
-
-    @Override
-    public void addExpectedConstantValue(ExpressionPosition expr, int actual, String s) {
-        errors.add(new ExpressionValidationError(
-                addExpr(expr, String.format("Unexpected constant value. Name: %s, Actual: %s", s, actual))
-        ));
-    }
-
-    @Override
-    public boolean hasErrors() {
-        return !errors.isEmpty();
-    }
-
-
-    @Override
-    public int getErrorCount() {
-      return errors.size();
-    }
-
-    @Override
-    public String toErrorString() {
-        return "\n" + Joiner.on("\n").join(errors);
-    }
-
-    @Override
-    public String toString() {
-      return toErrorString();
-    }
-
-
+    throw UserException.internalError(null)
+      .message("Failure while materializing expression.")
+      .addContext("Errors", toErrorString())
+      .build(logger);
+  }
 }