DRILL-8236: Move HttpHelperFunctions to use JSON2 reader (#2566)

* DRILL-8236: Move HttpHelperFunctions to use JSON2 reader

* DRILL-8236: Move HttpHelperFunctions to use JSON2 reader

Co-authored-by: Charles Givre <cgivre@apache.org>
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index 25f9f07..c669677 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -18,7 +18,6 @@
 
 package org.apache.drill.exec.store.http.udfs;
 
-import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.expr.DrillSimpleFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.annotations.Output;
@@ -26,6 +25,7 @@
 import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -52,22 +52,16 @@
     OptionManager options;
 
     @Inject
-    DrillBuf buffer;
+    ResultSetLoader loader;
 
     @Workspace
-    org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
 
     @Override
     public void setup() {
-      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
-        .defaultSchemaPathColumns()
-        .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
-        .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
-        .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
-        .build();
-
-      jsonReader.setIgnoreJSONParseErrors(
-        options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG));
+      jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+        .resultSetLoader(loader)
+        .standardOptions(options);
     }
 
     @Override
@@ -102,10 +96,11 @@
       }
 
       try {
-        jsonReader.setSource(results);
-        jsonReader.setIgnoreJSONParseErrors(true);  // Reduce number of errors
-        jsonReader.write(writer);
-        buffer = jsonReader.getWorkBuf();
+        jsonLoaderBuilder.fromString(results);
+        org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
+        loader.startBatch();
+        jsonLoader.readBatch();
+        loader.close();
       } catch (Exception e) {
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
@@ -134,10 +129,10 @@
     DrillbitContext drillbitContext;
 
     @Inject
-    DrillBuf buffer;
+    ResultSetLoader loader;
 
     @Workspace
-    org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
 
     @Workspace
     org.apache.drill.exec.store.http.HttpStoragePlugin plugin;
@@ -147,15 +142,9 @@
 
     @Override
     public void setup() {
-      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
-        .defaultSchemaPathColumns()
-        .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
-        .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
-        .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
-        .build();
-
-      jsonReader.setIgnoreJSONParseErrors(
-        options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG));
+      jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+        .resultSetLoader(loader)
+        .standardOptions(options);
 
       String schemaPath = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
       // Get the plugin name and endpoint name
@@ -208,10 +197,11 @@
       }
 
       try {
-        jsonReader.setSource(results);
-        jsonReader.setIgnoreJSONParseErrors(true);  // Reduce number of errors
-        jsonReader.write(writer);
-        buffer = jsonReader.getWorkBuf();
+        jsonLoaderBuilder.fromString(results);
+        org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
+        loader.startBatch();
+        jsonLoader.readBatch();
+        loader.close();
       } catch (Exception e) {
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 9e32116..24d73ba 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -18,13 +18,17 @@
 
 package org.apache.drill.exec.store.http;
 
+import ch.qos.logback.classic.Level;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
+import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
 import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
@@ -32,6 +36,7 @@
 import org.apache.drill.shaded.guava.com.google.common.io.Files;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -51,10 +56,17 @@
   private static final int MOCK_SERVER_PORT = 47771;
   private static String TEST_JSON_RESPONSE;
   private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT;
-
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
 
   @BeforeClass
   public static void setup() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL)
+      .logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
+      .logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
+      .build();
     startCluster(ClusterFixture.builder(dirTestWatcher));
     TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
 
@@ -133,7 +145,7 @@
       client.queryBuilder().sql(sql).run();
       fail();
     } catch (Exception e) {
-      assertTrue(e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
+      assertTrue(e.getMessage(), e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
index d0dcace..c42e7de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
@@ -44,6 +44,10 @@
     managedBuffers.clear();
   }
 
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
   @Override
   public DrillBuf replace(DrillBuf old, int newSize) {
     if (managedBuffers.remove(old.memoryAddress()) == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 889c251..dafbf88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -47,6 +47,9 @@
 import org.apache.drill.exec.ops.QueryContext.SqlStatementType;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -348,6 +351,11 @@
   }
 
   @Override
+  public ResultSetLoader getResultSetLoader() {
+    return new ResultSetLoaderImpl(bufferManager.getAllocator(), new ResultSetOptions());
+  }
+
+  @Override
   public DrillbitEndpoint getForemanEndpoint() {
     return fragment.getForeman();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 0b37ce1..57ba2da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -32,6 +32,9 @@
 import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
 import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
@@ -322,6 +325,11 @@
   }
 
   @Override
+  public ResultSetLoader getResultSetLoader() {
+    return new ResultSetLoaderImpl(allocator, new ResultSetOptions());
+  }
+
+  @Override
   public PartitionExplorer getPartitionExplorer() {
     return new PartitionExplorerImpl(getRootSchema());
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
index cee74ff..9863761 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -20,6 +20,7 @@
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
@@ -44,6 +45,7 @@
           .put(OptionManager.class, "getOptions")
           .put(BufferManager.class, "getManagedBufferManager")
           .put(DrillbitContext.class, "getDrillbitContext")
+          .put(ResultSetLoader.class, "getResultSetLoader")
           .build();
 
 
@@ -98,6 +100,8 @@
    */
   DrillbitContext getDrillbitContext();
 
+  ResultSetLoader getResultSetLoader();
+
   /**
    * Works with value holders cache which holds constant value and its wrapper by type.
    * If value is absent uses holderInitializer to create holder and adds it to cache.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index ac83b49..90c11fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -344,29 +344,26 @@
    */
   protected boolean next() {
     switch (state) {
-    case LOOK_AHEAD:
-    case LOOK_AHEAD_WITH_EOF:
-      // Use batch previously read.
-      assert lookahead != null;
-      lookahead.exchange(scanOp.containerAccessor.container());
-      assert lookahead.getRecordCount() == 0;
-      lookahead = null;
-      if (state == State.LOOK_AHEAD_WITH_EOF) {
-        state = State.EOF;
-      } else {
-        state = State.ACTIVE;
+      case LOOK_AHEAD:
+      case LOOK_AHEAD_WITH_EOF:
+        // Use batch previously read.
+        assert lookahead != null;
+        lookahead.exchange(scanOp.containerAccessor.container());
+        assert lookahead.getRecordCount() == 0;
+        lookahead = null;
+        if (state == State.LOOK_AHEAD_WITH_EOF) {
+          state = State.EOF;
+        } else {
+          state = State.ACTIVE;
+        }
+        return true;
+      case ACTIVE:
+        return readBatch();
+      case EOF:
+        return false;
+      default:
+        throw new IllegalStateException("Unexpected state: " + state);
       }
-      return true;
-
-    case ACTIVE:
-      return readBatch();
-
-    case EOF:
-      return false;
-
-    default:
-      throw new IllegalStateException("Unexpected state: " + state);
-    }
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index e5755cb..a4ee14f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -20,10 +20,12 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -189,6 +191,16 @@
       return this;
     }
 
+    public JsonLoaderBuilder fromString(String jsonString) {
+      try (InputStream targetStream = IOUtils.toInputStream(jsonString, Charset.defaultCharset())) {
+        return fromStream(targetStream);
+      } catch (IOException e) {
+        throw UserException.dataReadError(e)
+          .message("Could not read JSON string: " + jsonString)
+          .build(logger);
+      }
+    }
+
     public JsonLoaderBuilder fromReader(Reader reader) {
       this.reader = reader;
       return this;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index d73d252..4d945c3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -18,6 +18,9 @@
 package org.apache.drill.test;
 
 import org.apache.drill.exec.alias.AliasRegistryProvider;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.metastore.MetastoreRegistry;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
@@ -245,6 +248,11 @@
     }
 
     @Override
+    public ResultSetLoader getResultSetLoader() {
+      return new ResultSetLoaderImpl(allocator, new ResultSetOptions());
+    }
+
+    @Override
     public ExecutorState getExecutorState() {
       return executorState;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
index 2bace96..d127ac4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
@@ -156,6 +157,8 @@
       QueryEvent event = listener.get();
       if (event.type == QueryEvent.Type.EOF) {
         state = State.EOF;
+      } else if (event.type == QueryEvent.Type.ERROR) {
+        throw DrillRuntimeException.create("Closed with outstanding buffers allocated");
       }
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index fb1fe15..41b6ece 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -694,21 +694,21 @@
       QueryEvent event = listener.get();
       switch (event.type)
       {
-      case BATCH:
-        batchCount++;
-        recordCount += event.batch.getHeader().getRowCount();
-        event.batch.release();
-        break;
-      case EOF:
-        state = event.state;
-        break loop;
-      case ERROR:
-        throw event.error;
-      case QUERY_ID:
-        queryId = event.queryId;
-        break;
-      default:
-        throw new IllegalStateException("Unexpected event: " + event.type);
+        case BATCH:
+          batchCount++;
+          recordCount += event.batch.getHeader().getRowCount();
+          event.batch.release();
+          break;
+        case EOF:
+          state = event.state;
+          break loop;
+        case ERROR:
+          throw event.error;
+        case QUERY_ID:
+          queryId = event.queryId;
+          break;
+        default:
+          throw new IllegalStateException("Unexpected event: " + event.type);
       }
     }
     long end = System.currentTimeMillis();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
index d8e2da1..880c05e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
@@ -58,32 +58,27 @@
     }
     while (true) {
       QueryEvent event = listener.get();
-      switch (event.type)
-      {
-      case BATCH:
-        batchCount++;
-        recordCount += event.batch.getHeader().getRowCount();
-        loader.load(event.batch.getHeader().getDef(), event.batch.getData());
-        event.batch.release();
-        return DirectRowSet.fromVectorAccessible(loader.allocator(), loader);
-
-      case EOF:
-        state = event.state;
-        eof = true;
-        return null;
-
-      case ERROR:
-        state = event.state;
-        eof = true;
-        throw event.error;
-
-      case QUERY_ID:
-        queryId = event.queryId;
-        continue;
-
-      default:
-        throw new IllegalStateException("Unexpected event: " + event.type);
-      }
+      switch (event.type) {
+        case BATCH:
+          batchCount++;
+          recordCount += event.batch.getHeader().getRowCount();
+          loader.load(event.batch.getHeader().getDef(), event.batch.getData());
+          event.batch.release();
+          return DirectRowSet.fromVectorAccessible(loader.allocator(), loader);
+        case EOF:
+          state = event.state;
+          eof = true;
+          return null;
+        case ERROR:
+          state = event.state;
+          eof = true;
+          throw event.error;
+        case QUERY_ID:
+          queryId = event.queryId;
+          continue;
+        default:
+          throw new IllegalStateException("Unexpected event: " + event.type);
+        }
     }
   }
 
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 717ef29..a927042 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -57,7 +57,7 @@
    * because the buffer is used multiple times by an operator.
    */
 
-  private static final int IO_BUFFER_SIZE = 32*1024;
+  private static final int IO_BUFFER_SIZE = 32 * 1024;
   private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
 
   private final BaseAllocator parentAllocator;
@@ -500,13 +500,13 @@
         if (allocatedCount > 0) {
           throw new IllegalStateException(
               String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
-                  name, allocatedCount, toString()));
+                  name, allocatedCount, this));
         }
 
         if (reservations.size() != 0) {
           throw new IllegalStateException(
               String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(),
-                  toString()));
+                this));
         }
 
       }
@@ -535,8 +535,7 @@
 
   @Override
   public String toString() {
-    final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE
-        : Verbosity.BASIC;
+    final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE : Verbosity.BASIC;
     final StringBuilder sb = new StringBuilder();
     print(sb, 0, verbosity);
     return sb.toString();
@@ -562,8 +561,7 @@
   /**
    * Rounds up the provided value to the nearest power of two.
    *
-   * @param val
-   *          An integer value.
+   * @param val An integer value.
    * @return The closest power of two of that value.
    */
   public static int nextPowerOfTwo(int val) {
@@ -578,8 +576,7 @@
   /**
    * Rounds up the provided value to the nearest power of two.
    *
-   * @param val
-   *          An integer long value.
+   * @param val An integer long value.
    * @return The closest power of two of that value.
    */
   public static long longNextPowerOfTwo(long val) {
@@ -622,7 +619,6 @@
     }
 
     synchronized (DEBUG_LOCK) {
-
       final long allocated = getAllocatedMemory();
 
       // verify my direct descendants
@@ -689,9 +685,9 @@
         sb.append("allocator[");
         sb.append(name);
         sb.append("]\nallocated: ");
-        sb.append(Long.toString(allocated));
+        sb.append(allocated);
         sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
-        sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
+        sb.append(allocated - (bufferTotal + reservedTotal + childTotal));
         sb.append('\n');
 
         if (bufferTotal != 0) {
@@ -703,14 +699,14 @@
 
         if (childTotal != 0) {
           sb.append("child total: ");
-          sb.append(Long.toString(childTotal));
+          sb.append(childTotal);
           sb.append('\n');
 
           for (final BaseAllocator childAllocator : childSet) {
             sb.append("child allocator[");
             sb.append(childAllocator.name);
             sb.append("] owned ");
-            sb.append(Long.toString(childAllocator.getAllocatedMemory()));
+            sb.append(childAllocator.getAllocatedMemory());
             sb.append('\n');
           }
         }
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
index 4345a82..df4390f 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.ops;
 
 import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
 
 /**
  * Manages a list of {@link DrillBuf}s that can be reallocated as needed. Upon
@@ -62,5 +63,7 @@
    */
   public DrillBuf getManagedBuffer(int size);
 
+  BufferAllocator getAllocator();
+
   public void close();
 }