DRILL-7730: Improve web query efficiency

Implements a direct transfer of batches from Screen to web client.
Cleans up web client query processing to avoid duplicate schema
info.
Much related code cleanup.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 237aba1..fdbb971 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -93,6 +93,8 @@
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.drill.shaded.guava.com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.netty.channel.EventLoopGroup;
 
@@ -101,16 +103,15 @@
  * String into ByteBuf.
  */
 public class DrillClient implements Closeable, ConnectionThrottle {
+  private static Logger logger = LoggerFactory.getLogger(DrillClient.class);
   public static final String DEFAULT_CLIENT_NAME = "Apache Drill Java client";
-
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
-
   private static final ObjectMapper objectMapper = new ObjectMapper();
+
   private final DrillConfig config;
   private UserClient client;
   private DrillProperties properties;
   private volatile ClusterCoordinator clusterCoordinator;
-  private volatile boolean connected = false;
+  private volatile boolean connected;
   private final BufferAllocator allocator;
   private final int reconnectTimes;
   private final int reconnectDelay;
@@ -199,10 +200,12 @@
   }
 
   /**
-   * Sets whether the application is willing to accept complex types (Map, Arrays) in the returned result set.
-   * Default is {@code true}. If set to {@code false}, the complex types are returned as JSON encoded VARCHAR type.
+   * Sets whether the application is willing to accept complex types (Map,
+   * Arrays) in the returned result set. Default is {@code true}. If set to
+   * {@code false}, the complex types are returned as JSON encoded VARCHAR type.
    *
-   * @throws IllegalStateException if called after a connection has been established.
+   * @throws IllegalStateException
+   *           if called after a connection has been established.
    */
   public void setSupportComplexTypes(boolean supportComplexTypes) {
     if (connected) {
@@ -468,10 +471,10 @@
    */
   @Override
   public void close() {
-    if (this.client != null) {
-      this.client.close();
+    if (client != null) {
+      client.close();
     }
-    if (this.ownsAllocator && allocator != null) {
+    if (ownsAllocator && allocator != null) {
       DrillAutoCloseables.closeNoChecked(allocator);
     }
     if (ownsZkConnection) {
@@ -486,19 +489,17 @@
     }
     if (eventLoopGroup != null) {
       eventLoopGroup.shutdownGracefully();
+      eventLoopGroup = null;
     }
 
     if (executor != null) {
       executor.shutdownNow();
+      executor = null;
     }
 
-    // TODO:  Did DRILL-1735 changes cover this TODO?:
-    // TODO: fix tests that fail when this is called.
-    //allocator.close();
     connected = false;
   }
 
-
   /**
    * Return the server infos. Only available after connecting
    *
@@ -585,11 +586,15 @@
 
   /**
    * API to just plan a query without execution
+   *
    * @param type
    * @param query
-   * @param isSplitPlan - option to tell whether to return single or split plans for a query
-   * @return list of PlanFragments that can be used later on in {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType, java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)}
-   * to run a query without additional planning
+   * @param isSplitPlan
+   *          option to tell whether to return single or split plans for a query
+   * @return list of PlanFragments that can be used later on in
+   *         {@link #runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType,
+   *         java.util.List, org.apache.drill.exec.rpc.user.UserResultsListener)}
+   *         to run a query without additional planning
    */
   public DrillRpcFuture<QueryPlanFragments> planQuery(QueryType type, String query, boolean isSplitPlan) {
     GetQueryPlanFragments runQuery = GetQueryPlanFragments.newBuilder().setQuery(query).setType(type).setSplitPlan(isSplitPlan).build();
@@ -632,7 +637,7 @@
     client.submitQuery(resultsListener, query);
   }
 
-  /*
+  /**
    * Helper method to generate the UserCredentials message from the properties.
    */
   private UserBitShared.UserCredentials getUserCredentials() {
@@ -660,10 +665,10 @@
   }
 
   /**
-   * Get the list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
+   * Get the list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters.
    *
-   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
-   * @return The list of catalogs in <code>INFORMATION_SCHEMA.CATALOGS</code> table satisfying the given filters.
+   * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+   * @return The list of catalogs in {@code INFORMATION_SCHEMA.CATALOGS} table satisfying the given filters.
    */
   public DrillRpcFuture<GetCatalogsResp> getCatalogs(LikeFilter catalogNameFilter) {
     final GetCatalogsReq.Builder reqBuilder = GetCatalogsReq.newBuilder();
@@ -675,11 +680,11 @@
   }
 
   /**
-   * Get the list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
+   * Get the list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters.
    *
-   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
-   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
-   * @return The list of schemas in <code>INFORMATION_SCHEMA.SCHEMATA</code> table satisfying the given filters.
+   * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+   * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter.
+   * @return The list of schemas in {@code INFORMATION_SCHEMA.SCHEMATA} table satisfying the given filters.
    */
   public DrillRpcFuture<GetSchemasResp> getSchemas(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter) {
     final GetSchemasReq.Builder reqBuilder = GetSchemasReq.newBuilder();
@@ -695,13 +700,13 @@
   }
 
   /**
-   * Get the list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
+   * Get the list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters.
    *
-   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
-   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
-   * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
-   * @param tableTypeFilter Filter in <code>table type</code>. Pass null to apply no filter
-   * @return The list of tables in <code>INFORMATION_SCHEMA.TABLES</code> table satisfying the given filters.
+   * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+   * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter.
+   * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter.
+   * @param tableTypeFilter Filter in {@code table type}. Pass null to apply no filter
+   * @return The list of tables in {@code INFORMATION_SCHEMA.TABLES} table satisfying the given filters.
    */
   public DrillRpcFuture<GetTablesResp> getTables(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
       LikeFilter tableNameFilter, List<String> tableTypeFilter) {
@@ -726,13 +731,13 @@
   }
 
   /**
-   * Get the list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
+   * Get the list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters.
    *
-   * @param catalogNameFilter Filter on <code>catalog name</code>. Pass null to apply no filter.
-   * @param schemaNameFilter Filter on <code>schema name</code>. Pass null to apply no filter.
-   * @param tableNameFilter Filter in <code>table name</code>. Pass null to apply no filter.
-   * @param columnNameFilter Filter in <code>column name</code>. Pass null to apply no filter.
-   * @return The list of columns in <code>INFORMATION_SCHEMA.COLUMNS</code> table satisfying the given filters.
+   * @param catalogNameFilter Filter on {@code catalog name}. Pass null to apply no filter.
+   * @param schemaNameFilter Filter on {@code schema name}. Pass null to apply no filter.
+   * @param tableNameFilter Filter in {@code table name}. Pass null to apply no filter.
+   * @param columnNameFilter Filter in {@code column name}. Pass null to apply no filter.
+   * @return The list of columns in {@code INFORMATION_SCHEMA.COLUMNS} table satisfying the given filters.
    */
   public DrillRpcFuture<GetColumnsResp> getColumns(LikeFilter catalogNameFilter, LikeFilter schemaNameFilter,
       LikeFilter tableNameFilter, LikeFilter columnNameFilter) {
@@ -757,10 +762,10 @@
   }
 
   /**
-   * Create a prepared statement for given the <code>query</code>.
+   * Create a prepared statement for given the {@code query}.
    *
    * @param query
-   * @return The prepared statement for given the <code>query</code>.
+   * @return The prepared statement for given the {@code query}.
    */
   public DrillRpcFuture<CreatePreparedStatementResp> createPreparedStatement(final String query) {
     final CreatePreparedStatementReq req =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
index 47a364a..c751897 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
@@ -250,7 +250,7 @@
       System.out.println(getBatchMetaInfo(vcSerializable).toString());
 
       System.out.println("Schema Information");
-      for (final VectorWrapper w : vectorContainer) {
+      for (final VectorWrapper<?> w : vectorContainer) {
         final MaterializedField field = w.getValueVector().getField();
         System.out.println (String.format("name : %s, minor_type : %s, data_mode : %s",
                                           field.getName(),
@@ -279,7 +279,7 @@
       selectedRows = vcSerializable.getSv2().getCount();
     }
 
-    for (final VectorWrapper w : vectorContainer) {
+    for (final VectorWrapper<?> w : vectorContainer) {
        totalDataSize += w.getValueVector().getBufferSize();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java
index 19e72ff..e90cfae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/InvalidConnectionInfoException.java
@@ -22,6 +22,7 @@
 /**
  * Exception for malformed connection string from client
  */
+@SuppressWarnings("serial")
 public class InvalidConnectionInfoException extends NonTransientRpcException {
 
   public InvalidConnectionInfoException(String message) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
index 25e472f..25331da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/LoggingResultsListener.java
@@ -36,11 +36,13 @@
 import org.apache.drill.exec.util.VectorUtil;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.DrillBuf;
 
 public class LoggingResultsListener implements UserResultsListener {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoggingResultsListener.class);
+  private static Logger logger = LoggerFactory.getLogger(LoggingResultsListener.class);
 
   private final AtomicInteger count = new AtomicInteger();
   private final Stopwatch w = Stopwatch.createUnstarted();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index d60f6ae..5a5f725 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -201,7 +201,5 @@
       watch.reset();
     }
     return 0;
-
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
index c0c1e0b..7510cf4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingUserConnection.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.ops;
 
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.UserClientConnection;
@@ -31,14 +31,15 @@
   private final SendingAccountor sendingAccountor;
   private final RpcOutcomeListener<Ack> statusHandler;
 
-  public AccountingUserConnection(UserClientConnection connection, SendingAccountor sendingAccountor, RpcOutcomeListener<Ack> statusHandler) {
+  public AccountingUserConnection(UserClientConnection connection, SendingAccountor sendingAccountor,
+      RpcOutcomeListener<Ack> statusHandler) {
     this.connection = connection;
     this.sendingAccountor = sendingAccountor;
     this.statusHandler = statusHandler;
   }
 
-  public void sendData(QueryWritableBatch batch) {
+  public void sendData(QueryDataPackage data) {
     sendingAccountor.increment();
-    connection.sendData(statusHandler, batch);
+    connection.sendData(statusHandler, data);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 0f7ca13..3f71cbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -19,17 +19,14 @@
 
 import java.util.List;
 
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.AccountingUserConnection;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.RootFragmentContext;
 import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
-import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage.DataPackage;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage.EmptyResultsPackage;
 import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.testing.ControlsInjector;
@@ -49,13 +46,18 @@
     return new ScreenRoot(context, children.iterator().next(), config);
   }
 
+  /**
+   * Transfer batches to a user connection. The user connection is typically a
+   * network connection, but may be internal for a web or REST client. Data is
+   * sent as a "package", allowing the network client to request serialization,
+   * and the internal client to just transfer buffer ownership.
+   */
   public static class ScreenRoot extends BaseRootExec {
     private static final Logger logger = LoggerFactory.getLogger(ScreenRoot.class);
     private final RecordBatch incoming;
     private final RootFragmentContext context;
     private final AccountingUserConnection userConnection;
-    private RecordMaterializer materializer;
-
+    private DataPackage dataPackage;
     private boolean firstBatch = true;
 
     public enum Metric implements MetricDef {
@@ -67,15 +69,11 @@
       }
     }
 
-    public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+    public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) {
       super(context, config);
       this.context = context;
       this.incoming = incoming;
-      userConnection = context.getUserDataTunnel();
-    }
-
-    public RootFragmentContext getContext() {
-      return context;
+      this.userConnection = context.getUserDataTunnel();
     }
 
     @Override
@@ -85,53 +83,40 @@
       switch (outcome) {
         case NONE:
           if (firstBatch) {
-            // this is the only data message sent to the client and may contain the schema
-            QueryWritableBatch batch;
-            QueryData header = QueryData.newBuilder()
-              .setQueryId(context.getHandle().getQueryId())
-              .setRowCount(0)
-              .setDef(RecordBatchDef.getDefaultInstance())
-              .build();
-            batch = new QueryWritableBatch(header);
 
             stats.startWait();
             try {
-              userConnection.sendData(batch);
+              // This is the only data message sent to the client and does not contain the schema
+              userConnection.sendData(new EmptyResultsPackage(context.getHandle().getQueryId()));
             } finally {
               stats.stopWait();
             }
             firstBatch = false; // we don't really need to set this. But who knows!
           }
-
           return false;
+
         case OK_NEW_SCHEMA:
-          materializer = new VectorRecordMaterializer(context, oContext, incoming);
+          dataPackage = new DataPackage(new VectorRecordMaterializer(context, oContext, incoming), stats);
           //$FALL-THROUGH$
         case OK:
           injector.injectPause(context.getExecutionControls(), "sending-data", logger);
-          final QueryWritableBatch batch = materializer.convertNext();
-          updateStats(batch);
           stats.startWait();
           try {
-            userConnection.sendData(batch);
+            // Stats updated if connection serializes the batch
+            userConnection.sendData(dataPackage);
           } finally {
             stats.stopWait();
           }
           firstBatch = false;
-
           return true;
+
         default:
           throw new UnsupportedOperationException(outcome.name());
       }
     }
 
-    public void updateStats(QueryWritableBatch queryBatch) {
-      stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount());
-    }
-
-    RecordBatch getIncoming() {
-      return incoming;
-    }
+    public RootFragmentContext getContext() { return context; }
+    protected RecordBatch getIncoming() { return incoming; }
 
     @Override
     public void close() throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index a3d03c2..ec22632 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -24,9 +24,12 @@
 import org.apache.drill.exec.record.TransferPair;
 
 public interface Filterer {
-  TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
-  TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
+  TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 =
+      new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
+  TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 =
+      new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
 
-  void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
+  void setup(FragmentContext context, RecordBatch incoming,
+      RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
   void filterBatch(int recordCount) throws SchemaChangeException;
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java
new file mode 100644
index 0000000..f9a933d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryDataPackage.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.materialize;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot.Metric;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+
+/**
+ * Packages a batch from the Screen operator to send to its
+ * user connection. In the original Drill, that connection was always a
+ * network connection, and so the outgoing batch is serialized to a set
+ * of buffers ready to send. However, the REST server runs in the same process.
+ * The original REST query implementation serialized the data to buffers, then
+ * copied the data to a big buffer to be deserialized, causing significant memory
+ * pressure. This version allows the user connection to elect for serialization,
+ * or just to access the original source batch.
+ */
+public interface QueryDataPackage {
+
+  QueryId queryId();
+  QueryWritableBatch toWritableBatch();
+  VectorContainer batch();
+  List<SerializedField> fields();
+
+  /**
+   * Package that contains only a query ID. Send for a query that
+   * finishes with no data. The results are null: no data, no schema.
+   */
+  public static class EmptyResultsPackage implements QueryDataPackage {
+
+    private final QueryId queryId;
+
+    public EmptyResultsPackage(QueryId queryId) {
+      this.queryId = queryId;
+    }
+
+    @Override
+    public QueryId queryId() { return queryId; }
+
+    /**
+     * Creates a message that sends only the query ID to the
+     * client.
+     */
+    @Override
+    public QueryWritableBatch toWritableBatch() {
+      QueryData header = QueryData.newBuilder()
+        .setQueryId(queryId)
+        .setRowCount(0)
+        .setDef(RecordBatchDef.getDefaultInstance())
+        .build();
+      return new QueryWritableBatch(header);
+    }
+
+    @Override
+    public VectorContainer batch() { return null; }
+
+    @Override
+    public List<SerializedField> fields() {
+      return Collections.emptyList();
+    }
+  }
+
+  /**
+   * Represents a batch of data with a schema.
+   */
+  public static class DataPackage implements QueryDataPackage {
+    private final RecordMaterializer materializer;
+    private final OperatorStats stats;
+
+    public DataPackage(RecordMaterializer materializer, OperatorStats stats) {
+      this.materializer = materializer;
+      this.stats = stats;
+    }
+
+    @Override
+    public QueryId queryId() { return materializer.queryId(); }
+
+    @Override
+    public QueryWritableBatch toWritableBatch() {
+      QueryWritableBatch batch = materializer.convertNext();
+      stats.addLongStat(Metric.BYTES_SENT, batch.getByteCount());
+      return batch;
+    }
+
+    @Override
+    public VectorContainer batch() {
+      return materializer.incoming();
+    }
+
+    @Override
+    public List<SerializedField> fields() {
+      List<SerializedField> metadata = new ArrayList<>();
+      for (VectorWrapper<?> vw : batch()) {
+        metadata.add(vw.getValueVector().getMetadata());
+      }
+      return metadata;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index e69bd51..f9537b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -24,7 +24,6 @@
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 
 public class QueryWritableBatch {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
 
   private final QueryData header;
   private final ByteBuf[] buffers;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
index 75de592..9c2f7ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -17,9 +17,14 @@
  */
 package org.apache.drill.exec.physical.impl.materialize;
 
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.record.VectorContainer;
 
 public interface RecordMaterializer {
 
   public QueryWritableBatch convertNext();
 
+  public QueryId queryId();
+
+  public VectorContainer incoming();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index 7cdf9b3..c294774 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -25,16 +25,16 @@
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionSet;
 
 public class VectorRecordMaterializer implements RecordMaterializer {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorRecordMaterializer.class);
 
-  private QueryId queryId;
-  private RecordBatch batch;
-  private BufferAllocator allocator;
-  private OptionManager options;
+  private final QueryId queryId;
+  private final RecordBatch batch;
+  private final BufferAllocator allocator;
+  private final boolean resultResultsForDDL;
 
   public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) {
     this.queryId = context.getHandle().getQueryId();
@@ -42,19 +42,27 @@
     this.allocator = oContext.getAllocator();
     BatchSchema schema = batch.getSchema();
     assert schema != null : "Schema must be defined.";
-    options = context.getOptions();
+    OptionSet options = context.getOptions();
+    this.resultResultsForDDL = options.getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL);
   }
 
+  @Override
   public QueryWritableBatch convertNext() {
     WritableBatch w = batch.getWritableBatch().transfer(allocator);
     QueryData.Builder builder = QueryData.newBuilder()
         .setQueryId(queryId)
         .setRowCount(batch.getRecordCount())
         .setDef(w.getDef());
-    if (!options.getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL)) {
+    if (!resultResultsForDDL) {
       int count = w.getDef().getAffectedRowsCount();
       builder.setAffectedRowsCount(count == -1 ? 0 : count);
     }
     return new QueryWritableBatch(builder.build(), w.getBuffers());
   }
+
+  @Override
+  public QueryId queryId() { return queryId; }
+
+  @Override
+  public VectorContainer incoming() { return batch.getContainer(); }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index cb835e7..d4087fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -25,7 +25,7 @@
  * Implements an AbstractUnaryRecordBatch where the incoming record batch is
  * known at the time of creation
  *
- * @param <T>
+ * @param <T> the plan definition of the operator
  */
 public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractUnaryRecordBatch<T> {
 
@@ -43,12 +43,15 @@
   }
 
   /**
-   * Based on lastKnownOutcome and if there are more records to be output for current record boundary detected by
-   * EMIT outcome, this method returns EMIT or OK outcome.
+   * Based on lastKnownOutcome and if there are more records to be output for
+   * current record boundary detected by EMIT outcome, this method returns EMIT
+   * or OK outcome.
+   *
    * @param hasMoreRecordInBoundary
-   * @return - EMIT - If the lastknownOutcome was EMIT and output records corresponding to all the incoming records in
-   * current record boundary is already produced.
-   *         - OK - otherwise
+   * @return EMIT - If the lastknownOutcome was EMIT and output records
+   *         corresponding to all the incoming records in current record
+   *         boundary is already produced.
+   *         OK - otherwise
    */
   protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
     final IterOutcome lastOutcome = getLastKnownOutcome();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 6d8865d..2ee1047 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -30,7 +30,6 @@
  * {@link org.apache.drill.exec.record.metadata.TupleMetadata} instead.
  */
 public class BatchSchema implements Iterable<MaterializedField> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
 
   private final SelectionVectorMode selectionVectorMode;
   private final List<MaterializedField> fields;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 064c601..1eb2ac0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -51,7 +51,7 @@
   private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class);
 
   private final BufferAllocator allocator;
-  private VectorContainer container = new VectorContainer();
+  private VectorContainer container;
   private int valueCount;
   private BatchSchema schema;
 
@@ -60,6 +60,7 @@
    */
   public RecordBatchLoader(BufferAllocator allocator) {
     this.allocator = Preconditions.checkNotNull(allocator);
+    this.container = new VectorContainer(allocator);
   }
 
   public BufferAllocator allocator() { return allocator; }
@@ -116,12 +117,10 @@
           vector = TypeHelper.getNewVector(fieldDef, allocator);
 
         // If the field is a map or a dict, check if the schema changed.
-
         } else if ((vector.getField().getType().getMinorType() == MinorType.MAP || vector.getField().getType().getMinorType() == MinorType.DICT) &&
                    ! isSameSchema(vector.getField().getChildren(), field.getChildList())) {
 
           // The schema changed. Discard the old one and create a new one.
-
           schemaChanged = true;
           vector.clear();
           vector = TypeHelper.getNewVector(fieldDef, allocator);
@@ -155,8 +154,8 @@
       container = newVectors;
       container.setRecordCount(valueCount);
     } catch (final Throwable cause) {
-      // We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should
-      // adjudicate to call upper layer specific clean up logic.
+      // We have to clean up new vectors created here and pass over the actual cause.
+      // It is upper layer who should adjudicate to call upper layer specific clean up logic.
       VectorAccessibleUtilities.clear(newVectors);
       throw cause;
     } finally {
@@ -190,7 +189,6 @@
 
     // Column order can permute (see DRILL-5828). So, use a map
     // for matching.
-
     Map<String, MaterializedField> childMap = CaseInsensitiveMap.newHashMap();
     for (MaterializedField currentChild : currentChildren) {
       childMap.put(currentChild.getName(), currentChild);
@@ -199,13 +197,11 @@
       MaterializedField currentChild = childMap.get(newChild.getNamePart().getName());
 
       // New map member?
-
       if (currentChild == null) {
         return false;
       }
 
       // Changed data type?
-
       if (! currentChild.getType().equals(newChild.getMajorType())) {
         return false;
       }
@@ -223,7 +219,6 @@
     }
 
     // Everything matches.
-
     return true;
   }
 
@@ -232,20 +227,6 @@
     return container.getValueVectorId(path);
   }
 
-//
-//  @SuppressWarnings("unchecked")
-//  public <T extends ValueVector> T getValueVectorId(int fieldId, Class<?> clazz) {
-//    ValueVector v = container.get(fieldId);
-//    assert v != null;
-//    if (v.getClass() != clazz){
-//      logger.warn(String.format(
-//          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s.",
-//          clazz.getCanonicalName(), v.getClass().getCanonicalName()));
-//      return null;
-//    }
-//    return (T) v;
-//  }
-
   @Override
   public int getRecordCount() { return valueCount; }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 966ade7..1b30d78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -19,6 +19,7 @@
 
 import io.netty.buffer.DrillBuf;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -28,7 +29,6 @@
 import org.apache.drill.exec.vector.ValueVector;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 /**
  * A specialized version of record batch that can moves out buffers and preps
@@ -52,7 +52,7 @@
   }
 
   public WritableBatch transfer(BufferAllocator allocator) {
-    List<DrillBuf> newBuffers = Lists.newArrayList();
+    List<DrillBuf> newBuffers = new ArrayList<>();
     for (DrillBuf buf : buffers) {
       int writerIndex = buf.writerIndex();
       DrillBuf newBuf = buf.transferOwnership(allocator).buffer;
@@ -135,7 +135,7 @@
   }
 
   public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
-    List<ValueVector> vectors = Lists.newArrayList();
+    List<ValueVector> vectors = new ArrayList<>();
     for (VectorWrapper<?> vw : vws) {
       Preconditions.checkArgument(!vw.isHyper());
       vectors.add(vw.getValueVector());
@@ -144,8 +144,8 @@
   }
 
   public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
-    List<DrillBuf> buffers = Lists.newArrayList();
-    List<SerializedField> metadata = Lists.newArrayList();
+    List<DrillBuf> buffers = new ArrayList<>();
+    List<SerializedField> metadata = new ArrayList<>();
 
     for (ValueVector vv : vectors) {
       metadata.add(vv.getMetadata());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
index 2934c34..0fd2862 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractDisposableUserClientConnection.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.rpc;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -29,13 +31,14 @@
 import java.util.concurrent.TimeUnit;
 
 /**
- * Helps to run a query and await on the results. All the inheriting sub-class manages the session/connection
- * state and submits query with respect to that state. The subclass instance lifetime is per query lifetime
- * and is not re-used.
+ * Helps to run a query and await on the results. All the inheriting sub-class
+ * manages the session/connection state and submits query with respect to that
+ * state. The subclass instance lifetime is per query lifetime and is not
+ * re-used.
  */
 public abstract class AbstractDisposableUserClientConnection implements UserClientConnection {
-  private static final org.slf4j.Logger logger =
-      org.slf4j.LoggerFactory.getLogger(AbstractDisposableUserClientConnection.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(AbstractDisposableUserClientConnection.class);
 
   protected final CountDownLatch latch = new CountDownLatch(1);
 
@@ -72,7 +75,8 @@
     final QueryId queryId = result.getQueryId();
 
     if (logger.isDebugEnabled()) {
-      logger.debug("Result arrived for QueryId: {} with QueryState: {}", QueryIdHelper.getQueryId(queryId), state);
+      logger.debug("Result arrived for QueryId: {} with QueryState: {}",
+          QueryIdHelper.getQueryId(queryId), state);
     }
 
     switch (state) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
index 1372b29..179cc7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/UserClientConnection.java
@@ -18,7 +18,8 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.channel.ChannelFuture;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -26,12 +27,14 @@
 import java.net.SocketAddress;
 
 /**
- * Interface for getting user session properties and interacting with user connection. Separating this interface from
- * {@link AbstractRemoteConnection} implementation for user connection:
+ * Interface for getting user session properties and interacting with user
+ * connection. Separating this interface from {@link AbstractRemoteConnection}
+ * implementation for user connection:
  * <p><ul>
- * <li> Connection is passed to Foreman and Screen operators. Instead passing this interface exposes few details.
- * <li> Makes it easy to have wrappers around user connection which can be helpful to tap the messages and data
- * going to the actual client.
+ * <li>Connection is passed to Foreman and Screen operators. Instead passing
+ * this interface exposes few details.
+ * <li>Makes it easy to have wrappers around user connection which can be
+ * helpful to tap the messages and data going to the actual client.
  * </ul>
  */
 public interface UserClientConnection {
@@ -41,7 +44,7 @@
   UserSession getSession();
 
   /**
-   * Send query result outcome to client. Outcome is returned through <code>listener</code>
+   * Send query result outcome to client. Outcome is returned through {@code listener}.
    *
    * @param listener
    * @param result
@@ -49,12 +52,12 @@
   void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result);
 
   /**
-   * Send query data to client. Outcome is returned through <code>listener</code>
+   * Send query data to client. Outcome is returned through {@code listener}.
    *
    * @param listener
    * @param result
    */
-  void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result);
+  void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data);
 
   /**
    * Returns the {@link ChannelFuture} which will be notified when this
@@ -66,4 +69,4 @@
    * @return Return the client node address.
    */
   SocketAddress getRemoteAddress();
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
index 461d8aa..25f1c68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/AwaitableUserResultsListener.java
@@ -78,5 +78,4 @@
     }
     return count.get();
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 99b0723..460dbb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -120,13 +120,13 @@
 
     try {
       if (isFailureResult) {
-        // Failure case--pass on via submissionFailed(...).
 
+        // Failure case--pass on via submissionFailed(...).
         resultsListener.submissionFailed(new UserRemoteException(queryResult.getError(0)));
         // Note: Listener is removed in finally below.
       } else if (isTerminalResult) {
-        // A successful completion/canceled case--pass on via resultArrived
 
+        // A successful completion/canceled case--pass on via resultArrived
         try {
           resultsListener.queryCompleted(queryState);
         } catch (Exception e) {
@@ -189,9 +189,9 @@
     UserResultsListener resultsListener = queryIdToResultsListenersMap.get(queryId);
     logger.trace("For QueryId [{}], retrieved results listener {}", queryId, resultsListener);
     if (null == resultsListener) {
+
       // WHO?? didn't get query ID response and set submission listener yet,
       // so install a buffering listener for now
-
       BufferingResultsListener bl = new BufferingResultsListener();
       resultsListener = queryIdToResultsListenersMap.putIfAbsent(queryId, bl);
       // If we had a successful insertion, use that reference.  Otherwise, just
@@ -272,8 +272,7 @@
     }
 
     @Override
-    public void queryIdArrived(QueryId queryId) {
-    }
+    public void queryIdArrived(QueryId queryId) { }
   }
 
   private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 1d864f2..190e9ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -25,14 +25,21 @@
 public interface UserResultsListener {
 
   /**
-   * QueryId is available. Called when a query is successfully submitted to the server.
-   * @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK}
+   * QueryId is available. Called when a query is successfully submitted to the
+   * server.
+   *
+   * @param queryId
+   *          sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK
+   *          Acks.OK}
    */
   void queryIdArrived(QueryId queryId);
 
   /**
-   * The query has failed. Most likely called when the server returns a FAILED query state. Can also be called if
-   * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()} throws an exception
+   * The query has failed. Most likely called when the server returns a FAILED
+   * query state. Can also be called if
+   * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()}
+   * throws an exception
+   *
    * @param ex exception describing the cause of the failure
    */
   void submissionFailed(UserException ex);
@@ -45,10 +52,9 @@
   void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);
 
   /**
-   * The query has completed (successsful completion or cancellation). The listener will not receive any other
-   * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
-   * @param state
+   * The query has completed (successful completion or cancellation). The
+   * listener will not receive any other data or result message. Called when the
+   * server returns a terminal-non failing- state (COMPLETED or CANCELLED)
    */
   void queryCompleted(QueryState state);
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 276758e..cb1db13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -31,6 +31,7 @@
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
@@ -63,6 +64,7 @@
 import org.apache.hadoop.security.HadoopKerberosName;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLEngine;
 import javax.security.sasl.SaslException;
@@ -75,7 +77,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
+  private static final Logger logger = LoggerFactory.getLogger(UserServer.class);
   private static final String SERVER_NAME = "Apache Drill Server";
 
   private final UserConnectionConfig config;
@@ -84,7 +86,7 @@
   private final UserWorker userWorker;
   private static final ConcurrentHashMap<BitToUserConnection, BitToUserConnectionConfig> userConnectionMap;
 
-  //Initializing the singleton map during startup
+  // Initialize the singleton map during startup
   static {
     userConnectionMap = new ConcurrentHashMap<>();
   }
@@ -195,11 +197,13 @@
   }
 
   /**
-   * It represents a client connection accepted by Foreman Drillbit's UserServer from a DrillClient. This connection
-   * is used to get hold of {@link UserSession} which stores all session related information like session options
-   * changed over the lifetime of this connection. There is a 1:1 mapping between a BitToUserConnection and a
-   * UserSession. This connection object is also used to send query data and result back to the client submitted as part
-   * of the session tied to this connection.
+   * Represents a client connection accepted by Foreman Drillbit's UserServer
+   * from a DrillClient. This connection is used to get hold of
+   * {@link UserSession} which stores all session related information like
+   * session options changed over the lifetime of this connection. There is a
+   * 1:1 mapping between a BitToUserConnection and a UserSession. This
+   * connection object is also used to send query data and result back to the
+   * client submitted as part of the session tied to this connection.
    */
   public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection>
       implements UserClientConnection {
@@ -240,7 +244,6 @@
      * Sets the user on the session, and finalizes the session.
      *
      * @param userName user name to set on the session
-     *
      */
     void finalizeSession(String userName) {
       // create a session
@@ -262,9 +265,10 @@
     }
 
     @Override
-    public UserSession getSession(){
-      return session;
-    }
+    public UserSession getSession() { return session; }
+
+    @Override
+    protected Logger getLogger() { return logger; }
 
     @Override
     public void sendResult(final RpcOutcomeListener<Ack> listener, final QueryResult result) {
@@ -273,17 +277,13 @@
     }
 
     @Override
-    public void sendData(final RpcOutcomeListener<Ack> listener, final QueryWritableBatch result) {
+    public void sendData(final RpcOutcomeListener<Ack> listener, final QueryDataPackage data) {
+      QueryWritableBatch result = data.toWritableBatch();
       logger.trace("Sending data to client with {}", result);
       send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, false, result.getBuffers());
     }
 
     @Override
-    protected Logger getLogger() {
-      return logger;
-    }
-
-    @Override
     public ChannelFuture getChannelClosureFuture() {
       return getChannel().closeFuture()
           .addListener(new GenericFutureListener<Future<? super Void>>() {
@@ -504,10 +504,10 @@
    * User Connection's config for System Table access
    */
   public class BitToUserConnectionConfig {
-    private DateTime established;
-    private boolean isAuthEnabled;
-    private boolean isEncryptionEnabled;
-    private boolean isSSLEnabled;
+    private final DateTime established;
+    private final boolean isAuthEnabled;
+    private final boolean isEncryptionEnabled;
+    private final boolean isSSLEnabled;
 
     public BitToUserConnectionConfig() {
       established = new DateTime(); //Current Joda-based Time
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index 0798dea..a7594f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -49,11 +49,13 @@
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class UserSession implements AutoCloseable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserSession.class);
+  private static final Logger logger = LoggerFactory.getLogger(UserSession.class);
 
-  private boolean supportComplexTypes = false;
+  private boolean supportComplexTypes;
   private UserCredentials credentials;
   private DrillProperties properties;
   private SessionOptionManager sessionOptions;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
new file mode 100644
index 0000000..42d93f2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/BaseWebUserConnection.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+import java.net.SocketAddress;
+
+import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.user.UserSession;
+
+import io.netty.channel.ChannelFuture;
+
+public abstract class BaseWebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle {
+
+  protected WebSessionResources webSessionResources;
+
+  public BaseWebUserConnection(WebSessionResources webSessionResources) {
+    this.webSessionResources = webSessionResources;
+  }
+
+  @Override
+  public UserSession getSession() {
+    return webSessionResources.getSession();
+  }
+
+  @Override
+  public ChannelFuture getChannelClosureFuture() {
+    return webSessionResources.getCloseFuture();
+  }
+
+  @Override
+  public SocketAddress getRemoteAddress() {
+    return webSessionResources.getRemoteAddress();
+  }
+
+  @Override
+  public void setAutoRead(boolean enableAutoRead) { }
+
+  public WebSessionResources resources() {
+    return webSessionResources;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 141c027..c345571 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -237,9 +237,7 @@
     }
 
     @Override
-    public void dispose(WebUserConnection instance) {
-
-    }
+    public void dispose(WebUserConnection instance) { }
   }
 
   public static class AnonWebUserConnectionProvider implements Factory<WebUserConnection> {
@@ -300,14 +298,14 @@
     }
 
     @Override
-    public void dispose(WebUserConnection instance) {
-
-    }
+    public void dispose(WebUserConnection instance) { }
 
     /**
-     * Creates session user principal. If impersonation is enabled without authentication and User-Name header is present and valid,
-     * will create session user principal with provided user name, otherwise anonymous user name will be used.
-     * In both cases session user principal will have admin rights.
+     * Creates session user principal. If impersonation is enabled without
+     * authentication and User-Name header is present and valid, will create
+     * session user principal with provided user name, otherwise anonymous user
+     * name will be used. In both cases session user principal will have admin
+     * rights.
      *
      * @param config drill config
      * @param request client request
@@ -322,10 +320,12 @@
       }
       return new AnonDrillUserPrincipal();
     }
-
   }
 
-  // Provider which injects DrillUserPrincipal directly instead of getting it from SecurityContext and typecasting
+  /**
+   * Provider which injects DrillUserPrincipal directly instead of getting it
+   * from SecurityContext and typecasting
+   */
   public static class DrillUserPrincipalProvider implements Factory<DrillUserPrincipal> {
 
     @Inject HttpServletRequest request;
@@ -336,9 +336,7 @@
     }
 
     @Override
-    public void dispose(DrillUserPrincipal principal) {
-      // No-Op
-    }
+    public void dispose(DrillUserPrincipal principal) { }
   }
 
   // Provider which creates and cleanups DrillUserPrincipal for anonymous (auth disabled) mode
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
index 4aa2061..e29050a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
@@ -150,7 +150,7 @@
   private File getFileByName(File folder, final String name) {
     File[] files = folder.listFiles((dir, fileName) -> fileName.equals(name));
     if (files.length == 0) {
-      throw new DrillRuntimeException (name + " doesn't exist");
+      throw new DrillRuntimeException(name + " doesn't exist");
     }
     return files[0];
   }
@@ -159,9 +159,9 @@
   @XmlRootElement
   public class Log implements Comparable<Log> {
 
-    private String name;
-    private long size;
-    private DateTime lastModified;
+    private final String name;
+    private final long size;
+    private final DateTime lastModified;
 
     @JsonCreator
     public Log (@JsonProperty("name") String name, @JsonProperty("size") long size, @JsonProperty("lastModified") long lastModified) {
@@ -190,9 +190,9 @@
 
   @XmlRootElement
   public class LogContent {
-    private String name;
-    private Collection<String> lines;
-    private int maxLines;
+    private final String name;
+    private final Collection<String> lines;
+    private final int maxLines;
 
     @JsonCreator
     public LogContent (@JsonProperty("name") String name, @JsonProperty("lines") Collection<String> lines, @JsonProperty("maxLines") int maxLines) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 94fcea9..be4c331 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -22,6 +22,7 @@
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.shaded.guava.com.google.common.base.CharMatcher;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.parquet.Strings;
@@ -98,7 +99,7 @@
   public static final class RestQueryBuilder {
 
     private String query;
-    private String queryType = "SQL";
+    private String queryType = QueryType.SQL.name();
     private int rowLimit;
     private String userName;
     private String defaultSchema;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
index e944386..69d81ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/RestQueryRunner.java
@@ -62,7 +62,7 @@
     this.options = webUserConnection.getSession().getOptions();
   }
 
-  public RestQueryRunner.QueryResult run() throws Exception {
+  public QueryResult run() throws Exception {
     applyUserName();
     applyOptions();
     applyDefaultSchema();
@@ -131,7 +131,7 @@
     return maxRows;
   }
 
-  public RestQueryRunner.QueryResult submitQuery(int maxRows) {
+  public QueryResult submitQuery(int maxRows) {
     final RunQuery runQuery = RunQuery.newBuilder()
         .setType(QueryType.valueOf(query.getQueryType()))
         .setPlan(query.getQuery())
@@ -161,7 +161,7 @@
       }
     } while (!isComplete && !nearlyOutOfHeapSpace);
 
-    //Fail if nearly out of heap space
+    // Fail if nearly out of heap space
     if (nearlyOutOfHeapSpace) {
       UserException almostOutOfHeapException = UserException.resourceError()
           .message("There is not enough heap memory to run this query using the web interface. ")
@@ -185,7 +185,7 @@
     return new QueryResult(queryId, webUserConnection, webUserConnection.results);
   }
 
-  //Detect possible excess heap
+  // Detect possible excess heap
   private float getHeapUsage() {
     return (float) memMXBean.getHeapMemoryUsage().getUsed() / memMXBean.getHeapMemoryUsage().getMax();
   }
@@ -198,15 +198,16 @@
     public final String queryState;
     public final int attemptedAutoLimit;
 
-    //DRILL-6847:  Modified the constructor so that the method has access to all the properties in webUserConnection
+    // DRILL-6847:  Modified the constructor so that the method has access
+    // to all the properties in webUserConnection
     public QueryResult(QueryId queryId, WebUserConnection webUserConnection, List<Map<String, String>> rows) {
-        this.queryId = QueryIdHelper.getQueryId(queryId);
-        this.columns = webUserConnection.columns;
-        this.metadata = webUserConnection.metadata;
-        this.queryState = webUserConnection.getQueryState();
-        this.rows = rows;
-        this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount();
-      }
+      this.queryId = QueryIdHelper.getQueryId(queryId);
+      this.columns = webUserConnection.columns;
+      this.metadata = webUserConnection.metadata;
+      this.queryState = webUserConnection.getQueryState();
+      this.rows = rows;
+      this.attemptedAutoLimit = webUserConnection.getAutoLimitRowCount();
+    }
 
     public String getQueryId() {
       return queryId;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 6819457..e00cb48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -262,12 +262,11 @@
     responseHeadersSettingFilter.setInitParameters(ResponseHeadersSettingFilter.retrieveResponseHeaders(config));
     servletContextHandler.addFilter(responseHeadersSettingFilter, "/*", EnumSet.of(DispatcherType.REQUEST));
 
-
     return servletContextHandler;
   }
 
   /**
-   * It creates A {@link SessionHandler} which contains a {@link HashSessionManager}
+   * Create a {@link SessionHandler} which contains a {@link HashSessionManager}
    *
    * @param securityHandler Set of init parameters that are used by the Authentication
    * @return session handler
@@ -279,9 +278,7 @@
     sessionManager.getSessionCookieConfig().setHttpOnly(true);
     sessionManager.addEventListener(new HttpSessionListener() {
       @Override
-      public void sessionCreated(HttpSessionEvent se) {
-
-      }
+      public void sessionCreated(HttpSessionEvent se) { }
 
       @Override
       public void sessionDestroyed(HttpSessionEvent se) {
@@ -338,8 +335,9 @@
   }
 
   /**
-   * Create an HTTPS connector for given jetty server instance. If the admin has specified keystore/truststore settings
-   * they will be used else a self-signed certificate is generated and used.
+   * Create an HTTPS connector for given jetty server instance. If the admin has
+   * specified keystore/truststore settings they will be used else a self-signed
+   * certificate is generated and used.
    *
    * @return Initialized {@link ServerConnector} for HTTPS connections.
    */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
index 016278f..c06770e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebSessionResources.java
@@ -22,22 +22,23 @@
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.rpc.ChannelClosedException;
 import org.apache.drill.exec.rpc.user.UserSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.SocketAddress;
 
 /**
- * Class holding all the resources required for Web User Session. This class is responsible for the proper cleanup of
- * all the resources.
+ * Holds the resources required for Web User Session. This class is responsible
+ * for the proper cleanup of all the resources.
  */
 public class WebSessionResources implements AutoCloseable {
+  private static final Logger logger = LoggerFactory.getLogger(WebSessionResources.class);
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebSessionResources.class);
-
-  private BufferAllocator allocator;
+  private final BufferAllocator allocator;
 
   private final SocketAddress remoteAddress;
 
-  private UserSession webUserSession;
+  private final UserSession webUserSession;
 
   private ChannelPromise closeFuture;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
index 0ca6abe..9427fc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebUserConnection.java
@@ -17,177 +17,128 @@
  */
 package org.apache.drill.exec.server.rest;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.util.ValueVectorElementFormatter;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-import io.netty.channel.ChannelFuture;
-import org.apache.drill.common.exceptions.UserException;
+
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
 import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.ConnectionThrottle;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.vector.ValueVector.Accessor;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 
-import java.net.SocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.ArrayList;
 import java.util.Set;
 
 /**
- * WebUserConnectionWrapper which represents the UserClientConnection between WebServer and Foreman, for the WebUser
- * submitting the query. It provides access to the UserSession executing the query. There is no actual physical
+ * {@code WebUserConnectionWrapper} which represents the {@code UserClientConnection} between
+ * WebServer and Foreman, for the WebUser submitting the query. It provides
+ * access to the {@code UserSession} executing the query. There is no actual physical
  * channel corresponding to this connection wrapper.
  *
- * It returns a close future with no actual underlying {@link io.netty.channel.Channel} associated with it but do have an
- * EventExecutor out of BitServer EventLoopGroup. Since there is no actual connection established using this class,
- * hence the close event will never be fired by underlying layer and close future is set only when the
+ * It returns a close future with no actual underlying
+ * {@link io.netty.channel.Channel} associated with it but do have an
+ * {@code EventExecutor} out of BitServer EventLoopGroup. Since there is no actual
+ * connection established using this class, hence the close event will never be
+ * fired by underlying layer and close future is set only when the
  * {@link WebSessionResources} are closed.
  */
-
-public class WebUserConnection extends AbstractDisposableUserClientConnection implements ConnectionThrottle {
-
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WebUserConnection.class);
-
-  protected WebSessionResources webSessionResources;
+public class WebUserConnection extends BaseWebUserConnection {
 
   public final List<Map<String, String>> results = Lists.newArrayList();
-
   public final Set<String> columns = Sets.newLinkedHashSet();
-
   public final List<String> metadata = new ArrayList<>();
-
   private int autoLimitRowCount;
+  private int rowCount;
 
   WebUserConnection(WebSessionResources webSessionResources) {
-    this.webSessionResources = webSessionResources;
+    super(webSessionResources);
   }
 
   @Override
-  public UserSession getSession() {
-    return webSessionResources.getSession();
+  public void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data) {
+    processBatch(data.batch());
+    listener.success(Acks.OK, null);
   }
 
-  @Override
-  public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
-    // There can be overflow here but DrillBuf doesn't support allocating with
-    // bytes in long. Hence we are just preserving the earlier behavior and logging debug log for the case.
-    final int dataByteCount = (int) result.getByteCount();
-
-    if (dataByteCount < 0) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("There is BufferOverflow in dataByteCount: {}",
-            dataByteCount);
-      }
-      listener.success(Acks.OK, null);
+  private void processBatch(VectorContainer batch) {
+    if (batch == null) {
+      // Empty query: no data, no schema.
       return;
     }
 
-    // Create a ByteBuf with all the data in it.
-    final int rows = result.getHeader().getRowCount();
-    final BufferAllocator allocator = webSessionResources.getAllocator();
-    final DrillBuf bufferWithData = allocator.buffer(dataByteCount);
-    try {
-      final ByteBuf[] resultDataBuffers = result.getBuffers();
+    // Build metadata only on first batch, or if the schema changes
+    if (metadata.isEmpty() || batch.isSchemaChanged()) {
+      columns.clear();
+      metadata.clear();
+      buildMetadata(batch.getSchema());
+    }
+    addResults(batch.getRecordCount(), batch);
+    batch.zeroVectors();
+  }
 
-      for (final ByteBuf buffer : resultDataBuffers) {
-        bufferWithData.writeBytes(buffer);
-        buffer.release();
-      }
+  private void buildMetadata(BatchSchema schema) {
+    for (int i = 0; i < schema.getFieldCount(); ++i) {
+      // DRILL-6847:  This section adds query metadata to the REST results
+      MaterializedField col = schema.getColumn(i);
+      columns.add(col.getName());
+      StringBuilder dataType = new StringBuilder(col.getType().getMinorType().name());
 
-      final RecordBatchLoader loader = new RecordBatchLoader(allocator);
-      try {
-        loader.load(result.getHeader().getDef(), bufferWithData);
-        // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-        // SchemaChangeException, so check/clean catch clause below.
-        for (int i = 0; i < loader.getSchema().getFieldCount(); ++i) {
-          //DRILL-6847:  This section adds query metadata to the REST results
-          MaterializedField col = loader.getSchema().getColumn(i);
-          columns.add(col.getName());
-          StringBuilder dataType = new StringBuilder(col.getType().getMinorType().name());
+      // For DECIMAL type
+      if (col.getType().hasPrecision()) {
+        dataType.append("(");
+        dataType.append(col.getType().getPrecision());
 
-          //For DECIMAL type
-          if (col.getType().hasPrecision()) {
-            dataType.append("(");
-            dataType.append(col.getType().getPrecision());
-
-            if (col.getType().hasScale()) {
-              dataType.append(", ");
-              dataType.append(col.getType().getScale());
-            }
-
-            dataType.append(")");
-          } else if (col.getType().hasWidth()) {
-            //Case for VARCHAR columns with specified width
-            dataType.append("(");
-            dataType.append(col.getType().getWidth());
-            dataType.append(")");
-          }
-          metadata.add(dataType.toString());
+        if (col.getType().hasScale()) {
+          dataType.append(", ");
+          dataType.append(col.getType().getScale());
         }
-        ValueVectorElementFormatter formatter = new ValueVectorElementFormatter(webSessionResources.getSession().getOptions());
-        for (int i = 0; i < rows; ++i) {
-          final Map<String, String> record = Maps.newHashMap();
-          for (VectorWrapper<?> vw : loader) {
-            final String field = vw.getValueVector().getMetadata().getNamePart().getName();
-            final TypeProtos.MinorType fieldMinorType = vw.getValueVector().getMetadata().getMajorType().getMinorType();
-            final Accessor accessor = vw.getValueVector().getAccessor();
-            final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
-            final String display = value == null ? null : formatter.format(value, fieldMinorType);
-            record.put(field, display);
-          }
-          results.add(record);
-        }
-      } finally {
-        loader.clear();
+
+        dataType.append(")");
+      } else if (col.getType().hasWidth()) {
+        // Case for VARCHAR columns with specified width
+        dataType.append("(");
+        dataType.append(col.getType().getWidth());
+        dataType.append(")");
       }
-    } catch (Exception e) {
-      boolean verbose = webSessionResources.getSession().getOptions().getBoolean(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY);
-      // Wrapping the exception into UserException and then into DrillPBError.
-      // It will be thrown as exception in QueryWrapper class.
-      // It's verbosity depends on system option "exec.errors.verbose".
-      error = UserException.systemError(e).build(logger).getOrCreatePBError(verbose);
-    } finally {
-      // Notify the listener with ACK.OK both in error/success case because data was send successfully from Drillbit.
-      bufferWithData.release();
-      listener.success(Acks.OK, null);
+      metadata.add(dataType.toString());
     }
   }
 
-  @Override
-  public ChannelFuture getChannelClosureFuture() {
-    return webSessionResources.getCloseFuture();
-  }
-
-  @Override
-  public SocketAddress getRemoteAddress() {
-    return webSessionResources.getRemoteAddress();
-  }
-
-  @Override
-  public void setAutoRead(boolean enableAutoRead) {
-    // no-op
+  private void addResults(int rows, VectorAccessible batch) {
+    ValueVectorElementFormatter formatter = new ValueVectorElementFormatter(webSessionResources.getSession().getOptions());
+    if (autoLimitRowCount > 0) {
+      rows = Math.max(0, Math.min(rows, autoLimitRowCount - rowCount));
+    }
+    for (int i = 0; i < rows; ++i) {
+      rowCount++;
+      final Map<String, String> record = Maps.newHashMap();
+      for (VectorWrapper<?> vw : batch) {
+        final String field = vw.getValueVector().getMetadata().getNamePart().getName();
+        final TypeProtos.MinorType fieldMinorType = vw.getValueVector().getMetadata().getMajorType().getMinorType();
+        final Accessor accessor = vw.getValueVector().getAccessor();
+        final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
+        final String display = value == null ? null : formatter.format(value, fieldMinorType);
+        record.put(field, display);
+      }
+      results.add(record);
+    }
   }
 
   /**
    * For authenticated WebUser no cleanup of {@link WebSessionResources} is done since it's re-used
    * for all the queries until lifetime of the web session.
    */
-  public void cleanupSession() {
-    // no-op
-  }
+  public void cleanupSession() { }
 
   public static class AnonWebUserConnection extends WebUserConnection {
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index ac1b4cc..de49e28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -21,6 +21,8 @@
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.drill.common.SelfCleaningRunnable;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -60,11 +62,11 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments
- * running elsewhere.
+ * Manages the running fragments in a Drillbit. Periodically requests run-time
+ * stats updates from fragments running elsewhere.
  */
 public class WorkManager implements AutoCloseable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);
+  private static final Logger logger = LoggerFactory.getLogger(WorkManager.class);
 
   private static final int EXIT_TIMEOUT_MS = 5000;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
index ef018f6..5de560a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/prepare/PreparedStatementProvider.java
@@ -18,14 +18,14 @@
 package org.apache.drill.exec.work.prepare;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import io.netty.buffer.ByteBuf;
+
 import io.netty.channel.ChannelFuture;
 import org.apache.drill.common.exceptions.ErrorHelper;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
 import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
@@ -43,6 +43,7 @@
 import org.apache.drill.exec.proto.UserProtos.ResultColumnMetadata;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.rpc.AbstractDisposableUserClientConnection;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
@@ -53,6 +54,8 @@
 import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
 import org.apache.drill.exec.work.user.UserWorker;
 import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
 import java.net.SocketAddress;
@@ -72,11 +75,11 @@
  * Contains worker {@link Runnable} for creating a prepared statement and helper methods.
  */
 public class PreparedStatementProvider {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PreparedStatementProvider.class);
+  private static final Logger logger = LoggerFactory.getLogger(PreparedStatementProvider.class);
 
   /**
-   * Static list of mappings from {@link MinorType} to JDBC ResultSet class name (to be returned through
-   * {@link ResultSetMetaData#getColumnClassName(int)}.
+   * Static list of mappings from {@link MinorType} to JDBC ResultSet class name
+   * (to be returned through {@link ResultSetMetaData#getColumnClassName(int)}.
    */
   private static final Map<MinorType, String> DRILL_TYPE_TO_JDBC_CLASSNAME = ImmutableMap.<MinorType, String>builder()
       .put(MinorType.INT, Integer.class.getName())
@@ -186,9 +189,10 @@
   }
 
   /**
-   * Helper method to create {@link DrillPBError} and set it in <code>respBuilder</code>
+   * Helper method to create {@link DrillPBError} and set it in {@code respBuilder}
    */
-  private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final RequestStatus status,
+  private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder,
+      final RequestStatus status,
       final Throwable ex, final String message, final ErrorType errorType) {
     respBuilder.setStatus(status);
     final String errorId = UUID.randomUUID().toString();
@@ -211,7 +215,7 @@
   }
 
   /**
-   * Helper method to log error and set given {@link DrillPBError} in <code>respBuilder</code>
+   * Helper method to log error and set given {@link DrillPBError} in {@code respBuilder}
    */
   private static void setErrorHelper(final CreatePreparedStatementResp.Builder respBuilder, final DrillPBError error,
       final String message) {
@@ -250,14 +254,14 @@
     }
 
     @Override
-    public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result) {
+    public void sendData(RpcOutcomeListener<Ack> listener, QueryDataPackage data) {
       // Save the query results schema and release the buffers.
-      if (fields == null) {
-        fields = result.getHeader().getDef().getFieldList();
-      }
-
-      for (ByteBuf buf : result.getBuffers()) {
-        buf.release();
+      VectorContainer batch = data.batch();
+      if (batch != null) {
+        if (fields == null) {
+          fields = data.fields();
+        }
+        batch.zeroVectors();
       }
 
       listener.success(Acks.OK, null);
@@ -287,16 +291,18 @@
     builder.setCatalogName(InfoSchemaConstants.IS_CATALOG_NAME);
 
     /**
-     * Designated column's schema name. Empty string if not applicable. Initial implementation defaults to empty string
-     * as we use LIMIT 0 queries to get the schema and schema info is lost. If we derive the schema from plan, we may
-     * get the right value.
+     * Designated column's schema name. Empty string if not applicable. Initial
+     * implementation defaults to empty string as we use LIMIT 0 queries to get
+     * the schema and schema info is lost. If we derive the schema from plan, we
+     * may get the right value.
      */
     builder.setSchemaName("");
 
     /**
-     * Designated column's table name. Not set if not applicable. Initial implementation defaults to empty string as
-     * we use LIMIT 0 queries to get the schema and table info is lost. If we derive the table from plan, we may get
-     * the right value.
+     * Designated column's table name. Not set if not applicable. Initial
+     * implementation defaults to empty string as we use LIMIT 0 queries to get
+     * the schema and table info is lost. If we derive the table from plan, we
+     * may get the right value.
      */
     builder.setTableName("");
 
@@ -327,7 +333,8 @@
     builder.setPrecision(Types.getPrecision(field.getMajorType()));
 
     /**
-     * Column's number of digits to right of the decimal point. 0 is returned for types where the scale is not applicable
+     * Column's number of digits to right of the decimal point. 0 is returned
+     * for types where the scale is not applicable
      */
     builder.setScale(Types.getScale(majorType));
 
@@ -342,8 +349,8 @@
     builder.setDisplaySize(Types.getJdbcDisplaySize(majorType));
 
     /**
-     * Is the column an aliased column. Initial implementation defaults to true as we derive schema from LIMIT 0 query and
-     * not plan
+     * Is the column an aliased column. Initial implementation defaults to true
+     * as we derive schema from LIMIT 0 query and not plan
      */
     builder.setIsAliased(true);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 9c32b56..976820d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -41,9 +41,11 @@
 import org.apache.drill.exec.work.metadata.MetadataProvider;
 import org.apache.drill.exec.work.metadata.ServerMetaProvider.ServerMetaWorker;
 import org.apache.drill.exec.work.prepare.PreparedStatementProvider.PreparedStatementWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class UserWorker{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
+  static final Logger logger = LoggerFactory.getLogger(UserWorker.class);
 
   private final WorkerBee bee;
   private final QueryCountIncrementer incrementer = new QueryCountIncrementer() {
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 3fc5f7c..9e90d06 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -42,8 +42,9 @@
 import io.netty.util.internal.PlatformDependent;
 
 /**
- * Drill data structure for accessing and manipulating data buffers. This class is integrated with the
- * Drill memory management layer for quota enforcement and buffer sharing.
+ * Drill data structure for accessing and manipulating data buffers. This class
+ * is integrated with the Drill memory management layer for quota enforcement
+ * and buffer sharing.
  */
 @SuppressWarnings("unused")
 public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
@@ -121,18 +122,22 @@
   }
 
   /**
-   * Create a new DrillBuf that is associated with an alternative allocator for the purposes of memory ownership and
-   * accounting. This has no impact on the reference counting for the current DrillBuf except in the situation where the
-   * passed in Allocator is the same as the current buffer.
+   * Create a new DrillBuf that is associated with an alternative allocator for
+   * the purposes of memory ownership and accounting. This has no impact on the
+   * reference counting for the current DrillBuf except in the situation where
+   * the passed in Allocator is the same as the current buffer.
    *
-   * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
-   * reference count of 1 (in the case that this is the first time this memory is being associated with the new
-   * allocator) or the current value of the reference count + 1 for the other AllocationManager/BufferLedger combination
-   * in the case that the provided allocator already had an association to this underlying memory.
+   * This operation has no impact on the reference count of this DrillBuf. The
+   * newly created DrillBuf with either have a reference count of 1 (in the case
+   * that this is the first time this memory is being associated with the new
+   * allocator) or the current value of the reference count + 1 for the other
+   * AllocationManager/BufferLedger combination in the case that the provided
+   * allocator already had an association to this underlying memory.
    *
    * @param target
    *          The target allocator to create an association with.
-   * @return A new DrillBuf which shares the same underlying memory as this DrillBuf.
+   * @return A new DrillBuf which shares the same underlying memory as this
+   *         DrillBuf.
    */
   public DrillBuf retain(BufferAllocator target) {
 
@@ -148,28 +153,35 @@
   }
 
   /**
-   * Transfer the memory accounting ownership of this DrillBuf to another allocator. This will generate a new DrillBuf
-   * that carries an association with the underlying memory of this DrillBuf. If this DrillBuf is connected to the
-   * owning BufferLedger of this memory, that memory ownership/accounting will be transferred to the target allocator. If
-   * this DrillBuf does not currently own the memory underlying it (and is only associated with it), this does not
-   * transfer any ownership to the newly created DrillBuf.
+   * Transfer the memory accounting ownership of this DrillBuf to another
+   * allocator. This will generate a new DrillBuf that carries an association
+   * with the underlying memory of this DrillBuf. If this DrillBuf is connected
+   * to the owning BufferLedger of this memory, that memory ownership/accounting
+   * will be transferred to the target allocator. If this DrillBuf does not
+   * currently own the memory underlying it (and is only associated with it),
+   * this does not transfer any ownership to the newly created DrillBuf.
    * <p>
-   * This operation has no impact on the reference count of this DrillBuf. The newly created DrillBuf with either have a
-   * reference count of 1 (in the case that this is the first time this memory is being associated with the new
-   * allocator) or the current value of the reference count for the other AllocationManager/BufferLedger combination in
-   * the case that the provided allocator already had an association to this underlying memory.
+   * This operation has no impact on the reference count of this DrillBuf. The
+   * newly created DrillBuf with either have a reference count of 1 (in the case
+   * that this is the first time this memory is being associated with the new
+   * allocator) or the current value of the reference count for the other
+   * AllocationManager/BufferLedger combination in the case that the provided
+   * allocator already had an association to this underlying memory.
    * <p>
-   * Transfers will always succeed, even if that puts the other allocator into an overlimit situation. This is possible
-   * due to the fact that the original owning allocator may have allocated this memory out of a local reservation
-   * whereas the target allocator may need to allocate new memory from a parent or RootAllocator. This operation is done
-   * in a mostly-lockless but consistent manner. As such, the overlimit==true situation could occur slightly prematurely
-   * to an actual overlimit==true condition. This is simply conservative behavior which means we may return overlimit
-   * slightly sooner than is necessary.
+   * Transfers will always succeed, even if that puts the other allocator into
+   * an overlimit situation. This is possible due to the fact that the original
+   * owning allocator may have allocated this memory out of a local reservation
+   * whereas the target allocator may need to allocate new memory from a parent
+   * or RootAllocator. This operation is done in a mostly-lockless but
+   * consistent manner. As such, the overlimit==true situation could occur
+   * slightly prematurely to an actual overlimit==true condition. This is simply
+   * conservative behavior which means we may return overlimit slightly sooner
+   * than is necessary.
    *
    * @param target
    *          The allocator to transfer ownership to.
-   * @return A new transfer result with the impact of the transfer (whether it was overlimit) as well as the newly
-   *         created DrillBuf.
+   * @return A new transfer result with the impact of the transfer (whether it
+   *         was overlimit) as well as the newly created DrillBuf.
    */
   public TransferResult transferOwnership(BufferAllocator target) {
 
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
index bfce9a0..36550d0 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java
@@ -43,11 +43,11 @@
  * UDLE. Ensures that one allocator owns the memory that multiple allocators may
  * be referencing. Manages a BufferLedger between each of its associated
  * allocators. This class is also responsible for managing when memory is
- * allocated and returned to the Netty-based PooledByteBufAllocatorL.
- *
+ * allocated and returned to the Netty-based {code PooledByteBufAllocatorL}.
+ * <p>
  * The only reason that this isn't package private is we're forced to put
  * DrillBuf in Netty's package which need access to these objects or methods.
- *
+ * <p>
  * Threading: AllocationManager manages thread-safety internally. Operations
  * within the context of a single BufferLedger are lockless in nature and can be
  * leveraged by multiple threads. Operations that cross the context of two
@@ -56,7 +56,6 @@
  * allocation. As such, there will be thousands of these in a typical query. The
  * contention of acquiring a lock on AllocationManager should be very low.
  */
-
 public class AllocationManager {
 
   private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0);
@@ -151,7 +150,6 @@
    * AllocationManager that it now longer needs to hold a reference to
    * particular piece of memory.
    */
-
   private class ReleaseListener {
 
     private final BufferAllocator allocator;
@@ -163,7 +161,6 @@
     /**
      * Can only be called when you already hold the writeLock.
      */
-
     public void release() {
       allocator.assertOpen();
 
@@ -200,7 +197,6 @@
    * only reason this is public is due to DrillBuf being in io.netty.buffer
    * package.
    */
-
   public class BufferLedger {
 
     private final IdentityHashMap<DrillBuf, Object> buffers =
@@ -223,8 +219,10 @@
     }
 
     /**
-     * Transfer any balance the current ledger has to the target ledger. In the case that the current ledger holds no
-     * memory, no transfer is made to the new ledger.
+     * Transfer any balance the current ledger has to the target ledger. In the
+     * case that the current ledger holds no memory, no transfer is made to the
+     * new ledger.
+     *
      * @param target
      *          The ledger to transfer ownership account to.
      * @return Whether transfer fit within target ledgers limits.
@@ -241,8 +239,8 @@
         return true;
       }
 
-      // since two balance transfers out from the allocator manager could cause incorrect accounting, we need to ensure
-      // that this won't happen by synchronizing on the allocator manager instance.
+      // since two balance transfers out from the allocator manager could cause incorrect accounting,
+      // we need to ensure that this won't happen by synchronizing on the allocator manager instance.
       try (@SuppressWarnings("unused") Closeable write = writeLock.open()) {
         if (owningLedger != this) {
           return true;
@@ -316,7 +314,6 @@
      * zero, this ledger should release its ownership back to the
      * AllocationManager
      */
-
     public int decrement(int decrement) {
       allocator.assertOpen();
 
@@ -346,7 +343,6 @@
      * @param allocator
      * @return The ledger associated with a particular BufferAllocator.
      */
-
     public BufferLedger getLedgerForAllocator(BufferAllocator allocator) {
       return associate((BaseAllocator) allocator);
     }
@@ -362,7 +358,6 @@
      * @return A new DrillBuf that shares references with all DrillBufs
      *         associated with this BufferLedger
      */
-
     public DrillBuf newDrillBuf(int offset, int length) {
       allocator.assertOpen();
       return newDrillBuf(offset, length, null);
@@ -411,7 +406,6 @@
      *
      * @return Size in bytes
      */
-
     public int getSize() {
       return size;
     }
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 8514df0..9498cc8 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -20,20 +20,16 @@
 import io.netty.buffer.ByteBuf;
 
 public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
 
   @Override
-  public void failed(RpcException ex) {
-  }
+  public void failed(RpcException ex) { }
 
   @Override
-  public void success(T value, ByteBuf buffer) {
-  }
+  public void success(T value, ByteBuf buffer) { }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void interrupted(final InterruptedException ex) {
-  }
+  public void interrupted(final InterruptedException ex) { }
 }
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
index 83380e2..cc2d9b9 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java
@@ -23,6 +23,8 @@
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.carrotsearch.hppc.IntObjectHashMap;
 import com.carrotsearch.hppc.procedures.IntObjectProcedure;
@@ -38,7 +40,7 @@
  * else works via Atomic variables.
  */
 class RequestIdMap {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestIdMap.class);
+  private static final Logger logger = LoggerFactory.getLogger(RequestIdMap.class);
 
   private final AtomicInteger lastCoordinationId = new AtomicInteger();
   private final AtomicBoolean isOpen = new AtomicBoolean(true);
@@ -72,9 +74,9 @@
 
     @Override
     public void apply(int key, RpcOutcome<?> value) {
-      try{
+      try {
         value.setException(exception);
-      }catch (final Exception e){
+      } catch (final Exception e){
         logger.warn("Failure while attempting to fail rpc response.", e);
       }
     }
@@ -158,6 +160,7 @@
     return rpc;
   }
 
+  @SuppressWarnings("unchecked")
   public <V> RpcOutcome<V> getAndRemoveRpcOutcome(int rpcType, int coordinationId, Class<V> clazz) {
 
     final RpcOutcome<?> rpc = removeFromMap(coordinationId);
@@ -172,16 +175,10 @@
           clazz.getCanonicalName(), rpcType, outcomeClass.getCanonicalName()));
     }
 
-    @SuppressWarnings("unchecked")
-    final
-    RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
-
-    // logger.debug("Returning casted future");
-    return crpc;
+    return (RpcOutcome<V>) rpc;
   }
 
   public void recordRemoteFailure(int coordinationId, DrillPBError failure) {
-    // logger.debug("Updating failed future.");
     try {
       final RpcOutcome<?> rpc = removeFromMap(coordinationId);
       rpc.setException(new UserRemoteException(failure));
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index a1c125b..4afa159 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -27,13 +27,11 @@
    */
   void failed(RpcException ex);
 
-
   void success(V value, ByteBuf buffer);
 
   /**
-   * Called when the sending thread is interrupted. Possible when the fragment is cancelled due to query cancellations or
-   * failures.
+   * Called when the sending thread is interrupted. Possible when the fragment
+   * is cancelled due to query cancellations or failures.
    */
   void interrupted(final InterruptedException e);
-
 }