DRILL-7734: Revise the result set reader

Revised into two forms: push (for streaming JSON results) and
pull (for one operator reading from another).

closes #2077
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java
new file mode 100644
index 0000000..1877ec4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PullResultSetReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet;
+
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Iterates over the set of batches in a result set, providing
+ * a row set reader to iterate over the rows within each batch.
+ * Handles schema changes between batches. A typical use is to
+ * iterate over batches from an upstream operator. Protocol:
+ *
+ * <h4>Protocol</h4>
+ * <ol>
+ * <li>Create an instance.</li>
+ * <li>For each incoming batch:
+ *   <ol>
+ *   <li>Call {@link #start()} to attach the batch. The associated
+ *       {@link BatchAccessor} reports if the schema has changed.</li>
+ *   <li>Call {@link #reader()} to obtain a reader.</li>
+ *   <li>Iterate over the batch using the reader.</li>
+ *   <li>Call {@link #release()} to free the memory for the
+ *       incoming batch. Or, to call {@link #detach()} to keep
+ *       the batch memory.</li>
+ *   </ol>
+ * <li>Call {@link #close()} after all batches are read.</li>
+ * </ol>
+ * <ul>
+ * <li>Create the result set reader via a specific subclass.
+ * If a query has a null result (no rows,
+ * no schema), the code which creates this class should instead
+ * indicate that no results are available. This class is only for
+ * the cases </li>
+ * <li>Call {@link #schema()}, if desired, to obtain the schema
+ * for this result set.</li>
+ * <li>Call {@link #next()} to advance to the first batch.</li>
+ * <li>If {@code next()} returns {@code true}, then call
+ * {@link #reader()} to obtain a reader over rows. This reader also
+ * provides the batch schema.</li>
+ * <li>Use the reader to iterate over rows in the batch.</li>
+ * <li>Call {@code next()} to advance to the next batch and
+ * repeat.</li>
+ * </ul>
+ * <p>
+ * The implementation may perform complex tasks behind the scenes:
+ * coordinate with the query runner (if remote), drive an operator
+ * (if within a DAG), etc. The implementation takes an interface
+ * that interfaces with the source of batches.
+ * <p>
+ * Designed to handle batches arriving from a single upstream
+ * operator. Uses Drill's strict form of schema identity: that
+ * not only must the column definitions match; the vectors must
+ * be identical from one batch to the next. If the vectors differ,
+ * then this class assumes a new schema has occurred, and will
+ * rebuild all the underlying readers, which can be costly.
+ */
+public interface PullResultSetReader {
+
+  /**
+   * Advance to the next batch of data. The iterator starts
+   * positioned before the first batch (but after obtaining
+   * a schema.)
+   * @return {@code true} if another batch is available,
+   * {@code false} if EOF
+   */
+  boolean next();
+
+  /**
+   * Return the schema for this result set.
+   */
+  TupleMetadata schema();
+
+  int schemaVersion();
+
+  /**
+   * Obtain a reader to iterate over the rows of the batch. The return
+   * value will likely be the same reader each time, so that this call
+   * is optional after the first batch.
+   */
+  RowSetReader reader();
+
+  /**
+   * Close this reader. Releases any memory still assigned
+   * to any attached batch. Call {@link #detach()} first if
+   * you want to preserve the batch memory.
+   */
+  void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java
new file mode 100644
index 0000000..b011cd9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/PushResultSetReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet;
+
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+
+/**
+ * Push-based result set reader, in which the caller obtains batches
+ * and registers them with the implementation. The client thus is responsible
+ * for detecting the end of batches and releasing memory. General protocol:
+ * <p>
+ * <ul>
+ * <li>Create an instance and bind it to a batch source.</li>
+ * <li>Obtain a batch, typically by having it passed in.</li>
+ * <li>Call {@link #start()} to obtain a reader for that batch.</li>
+ * <li>Iterate over the rows.</li>
+ * <li>Release memory for the batch.</li>
+ * </ul>
+ * <p>
+ * In Drill,
+ * batches may have the same or different schemas. Each call to
+ * {@link #start()} prepares a {@link RowSetReader} to use for
+ * the available batch. If the batch has the same schema as the previous,
+ * then the existing reader is simply repositioned at the start of the
+ * batch. If the schema changed (or this is the first batch), then a
+ * new reader is created. Thus, the client should not assume that the
+ * same reader is available across calls. However, if it is useful to
+ * cache column writers, simply check if the reader returned from
+ * {@code start()} is the same as the previous one. If so, the column
+ * writers are also the same.
+ */
+public interface PushResultSetReader {
+  RowSetReader start();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
index 5d68daf..cc18218 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetCopier.java
@@ -40,12 +40,13 @@
  * each non-schema-change batch.
  *
  * <h4>Protocol</h4>
+ *
  * Overall lifecycle:
  * <ol>
  * <li>Create an instance of the
  *     {@link org.apache.drill.exec.physical.resultSet.impl.ResultSetCopierImpl
- *      ResultSetCopierImpl} class, passing the input batch
- *      accessor to the constructor.</li>
+ *      ResultSetCopierImpl} class, passing the input row set reader
+ *      to the constructor.</li>
  * <li>Loop to process each output batch as shown below. That is, continually
  *     process calls to the {@link BatchIterator#next()} method.</li>
  * <li>Call {@link #close()}.</li>
@@ -57,8 +58,7 @@
  * <pre><code>
  * public IterOutcome next() {
  *   copier.startOutputBatch();
- *   while (! copier.isFull() {
- *     copier.freeInput();
+ *   while (!copier.isFull() {
  *     IterOutcome innerResult = inner.next();
  *     if (innerResult == DONE) { break; }
  *     copier.startInputBatch();
@@ -92,7 +92,6 @@
  * Because we wish to fill the output batch, we may be able to copy
  * part of a batch, the whole batch, or multiple batches to the output.
  */
-
 public interface ResultSetCopier {
 
   /**
@@ -102,9 +101,9 @@
 
   /**
    * Start the next input batch. The input batch must be held
-   * by the VectorAccessor passed into the constructor.
+   * by the {@code ResultSetReader} passed into the constructor.
    */
-  void startInputBatch();
+  boolean nextInputBatch();
 
   /**
    * If copying rows one by one, copy the next row from the
@@ -135,12 +134,6 @@
   void copyAllRows();
 
   /**
-   * Release the input. Must be called (explicitly, or via
-   * {@link #copyInput()} before loading another input batch.
-   */
-  void releaseInputBatch();
-
-  /**
    * Reports if the output batch has rows. Useful after the end
    * of input to determine if a partial output batch exists to
    * send downstream.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
deleted file mode 100644
index 45f3193..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetReader.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.resultSet;
-
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
-
-/**
- * Iterates over the set of batches in a result set, providing
- * a row set reader to iterate over the rows within each batch.
- * Handles schema changes between batches.
- * <p>
- * Designed to handle batches arriving from a single upstream
- * operator. Uses Drill's strict form of schema identity: that
- * not only must the column definitions match; the vectors must
- * be identical from one batch to the next. If the vectors differ,
- * then this class assumes a new schema has occurred, and will
- * rebuild all the underlying readers, which can be costly.
- *
- * <h4>Protocol</h4>
- * <ol>
- * <li>Create an instance, passing in a
- *     {@link BatchAccessor} to hold the batch and optional
- *     selection vector.</li>
- * <li>For each incoming batch:
- *   <ol>
- *   <li>Call {@link #start()} to attach the batch. The associated
- *       {@link BatchAccessor} reports if the schema has changed.</li>
- *   <li>Call {@link #reader()} to obtain a reader.</li>
- *   <li>Iterate over the batch using the reader.</li>
- *   <li>Call {@link #release()} to free the memory for the
- *       incoming batch. Or, to call {@link #detach()} to keep
- *       the batch memory.</li>
- *   </ol>
- * <li>Call {@link #close()} after all batches are read.</li>
- * </ol>
- */
-public interface ResultSetReader {
-
-  /**
-   * Start tracking a new batch in the associated
-   * vector container.
-   */
-  void start();
-
-  /**
-   * Get the row reader for this batch. The row reader is
-   * guaranteed to remain the same for the life of the
-   * result set reader.
-   *
-   * @return the row reader to read rows for the current
-   * batch
-   */
-  RowSetReader reader();
-
-  /**
-   * Detach the batch of data from this reader. Does not
-   * release the memory for that batch.
-   */
-  void detach();
-
-  /**
-   * Detach the batch of data from this reader and release
-   * the memory for that batch. Call this method before
-   * loading the underlying vector container with more
-   * data, then call {@link #start()} after new data is
-   * available.
-   */
-  void release();
-
-  /**
-   * Close this reader. Releases any memory still assigned
-   * to any attached batch. Call {@link #detach()} first if
-   * you want to preserve the batch memory.
-   */
-  void close();
-
-  /**
-   * Convenience method to access the input batch.
-   * @return the batch bound to the reader at construction
-   * time
-   */
-  BatchAccessor inputBatch();
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java
new file mode 100644
index 0000000..dc803de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PullResultSetReaderImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+/**
+ * <h4>Protocol</h4>
+ * <ol>
+ * <li>Create an instance, passing in a
+ *     {@link UpstreamSource} to provide batches and optional
+ *     selection vector.</li>
+ * <li>For each incoming batch:
+ *   <ol>
+ *   <li>Call {@link #start()} to attach the batch. The associated
+ *       {@link BatchAccessor} reports if the schema has changed.</li>
+ *   <li>Call {@link #reader()} to obtain a reader.</li>
+ *   <li>Iterate over the batch using the reader.</li>
+ *   <li>Call {@link #release()} to free the memory for the
+ *       incoming batch. Or, to call {@link #detach()} to keep
+ *       the batch memory.</li>
+ *   </ol>
+ * <li>Call {@link #close()} after all batches are read.</li>
+ * </ol>
+ */
+public class PullResultSetReaderImpl implements PullResultSetReader {
+
+  public interface UpstreamSource  extends PushResultSetReaderImpl.UpstreamSource {
+    boolean next();
+    void release();
+  }
+
+  @VisibleForTesting
+  protected enum State {
+      START,
+      PENDING,
+      BATCH,
+      DETACHED,
+      EOF,
+      CLOSED
+  }
+
+  private final PushResultSetReaderImpl baseReader;
+  private final UpstreamSource source;
+  private State state = State.START;
+  private RowSetReader rowSetReader;
+
+  public PullResultSetReaderImpl(UpstreamSource source) {
+    this.baseReader = new PushResultSetReaderImpl(source);
+    this.source = source;
+  }
+
+  @Override
+  public TupleMetadata schema() {
+    switch (state) {
+      case CLOSED:
+        return null;
+      case START:
+        if (!next()) {
+          return null;
+        }
+        state = State.PENDING;
+        break;
+      default:
+    }
+    return rowSetReader.tupleSchema();
+  }
+
+  @Override
+  public boolean next() {
+    switch (state) {
+      case PENDING:
+        state = State.BATCH;
+        return true;
+      case BATCH:
+        source.release();
+        break;
+      case CLOSED:
+        throw new IllegalStateException("Reader is closed");
+      case EOF:
+        return false;
+      case START:
+        break;
+      default:
+        source.release();
+    }
+    if (!source.next()) {
+      state = State.EOF;
+      return false;
+    }
+
+    rowSetReader = baseReader.start();
+    state = State.BATCH;
+    return true;
+  }
+
+  @Override
+  public int schemaVersion() { return source.schemaVersion(); }
+
+  @Override
+  public RowSetReader reader() {
+    Preconditions.checkState(state == State.BATCH, "Not in batch-ready state.");
+    return rowSetReader;
+  }
+
+  @Override
+  public void close() {
+    source.release();
+    state = State.CLOSED;
+  }
+
+  @VisibleForTesting
+  protected State state() { return state; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java
new file mode 100644
index 0000000..7fc1156
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/PushResultSetReaderImpl.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.resultSet.impl;
+
+import org.apache.drill.exec.physical.resultSet.PushResultSetReader;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.IndirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetReader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+public class PushResultSetReaderImpl implements PushResultSetReader {
+
+  public interface UpstreamSource {
+    int schemaVersion();
+    VectorContainer batch();
+    SelectionVector2 sv2();
+  }
+
+  public static class BatchHolder implements UpstreamSource {
+
+    private final VectorContainer container;
+    private int schemaVersion;
+
+    public BatchHolder(VectorContainer container) {
+      this.container = container;
+    }
+
+    public void newBatch() {
+      if (schemaVersion == 0) {
+        schemaVersion = 1;
+      } else if (container.isSchemaChanged()) {
+        schemaVersion++;
+      }
+    }
+
+    @Override
+    public int schemaVersion() { return schemaVersion; }
+
+    @Override
+    public VectorContainer batch() { return container; }
+
+    @Override
+    public SelectionVector2 sv2() { return null; }
+  }
+
+  private final UpstreamSource source;
+  private int priorSchemaVersion;
+  private RowSetReader rowSetReader;
+
+  public PushResultSetReaderImpl(UpstreamSource source) {
+    this.source = source;
+  }
+
+  @Override
+  public RowSetReader start() {
+    int sourceSchemaVersion = source.schemaVersion();
+    Preconditions.checkState(sourceSchemaVersion > 0);
+    Preconditions.checkState(priorSchemaVersion <= sourceSchemaVersion);
+
+    // If new schema, discard the old reader (if any, and create
+    // a new one that matches the new schema. If not a new schema,
+    // then the old reader is reused: it points to vectors which
+    // Drill requires be the same vectors as the previous batch,
+    // but with different buffers.
+    boolean newSchema = priorSchemaVersion != sourceSchemaVersion;
+    if (newSchema) {
+      rowSetReader = createRowSet().reader();
+      priorSchemaVersion = sourceSchemaVersion;
+    } else {
+      rowSetReader.newBatch();
+    }
+    return rowSetReader;
+  }
+
+  // TODO: Build the reader without the need for a row set
+  private RowSet createRowSet() {
+    VectorContainer container = source.batch();
+    switch (container.getSchema().getSelectionVectorMode()) {
+    case FOUR_BYTE:
+      throw new IllegalArgumentException("Build from SV4 not yet supported");
+    case NONE:
+      return DirectRowSet.fromContainer(container);
+    case TWO_BYTE:
+      return IndirectRowSet.fromSv2(container, source.sv2());
+    default:
+      throw new IllegalStateException("Invalid selection mode");
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
index 9c596ff..1dd8d54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetCopierImpl.java
@@ -18,14 +18,12 @@
 package org.apache.drill.exec.physical.resultSet.impl;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
 import org.apache.drill.exec.physical.resultSet.ResultSetCopier;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
-import org.apache.drill.exec.physical.resultSet.ResultSetReader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.accessor.ColumnReader;
 import org.apache.drill.exec.vector.accessor.ColumnWriter;
@@ -40,6 +38,7 @@
     BATCH_ACTIVE,
     NEW_SCHEMA,
     SCHEMA_PENDING,
+    END_OF_INPUT,
     CLOSED
   }
 
@@ -69,7 +68,7 @@
   // Input state
 
   private int currentSchemaVersion = -1;
-  private final ResultSetReader resultSetReader;
+  private final PullResultSetReader resultSetReader;
   protected RowSetReader rowReader;
 
   // Output state
@@ -85,14 +84,14 @@
   private CopyPair[] projection;
   private CopyAll activeCopy;
 
-  public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor inputBatch) {
-    this(allocator, inputBatch, new ResultSetOptionBuilder());
+  public ResultSetCopierImpl(BufferAllocator allocator, PullResultSetReader source) {
+    this(allocator, source, new ResultSetOptionBuilder());
   }
 
-  public ResultSetCopierImpl(BufferAllocator allocator, BatchAccessor inputBatch,
+  public ResultSetCopierImpl(BufferAllocator allocator, PullResultSetReader source,
       ResultSetOptionBuilder outputOptions) {
     this.allocator = allocator;
-    resultSetReader = new ResultSetReaderImpl(inputBatch);
+    resultSetReader = source;
     writerOptions = outputOptions;
     writerOptions.vectorCache(new ResultVectorCacheImpl(allocator));
     state = State.START;
@@ -104,7 +103,6 @@
 
       // No schema yet. Defer real batch start until we see an input
       // batch.
-
       state = State.NO_SCHEMA;
       return;
     }
@@ -112,7 +110,6 @@
     if (state == State.SCHEMA_PENDING) {
 
       // We have a pending new schema. Create new writers to match.
-
       createProjection();
     }
     resultSetWriter.startBatch();
@@ -120,67 +117,58 @@
     if (isCopyPending()) {
 
       // Resume copying if a copy is active.
-
       copyBlock();
     }
   }
 
   @Override
-  public void startInputBatch() {
+  public boolean nextInputBatch() {
+    if (state == State.END_OF_INPUT) {
+      return false;
+    }
     Preconditions.checkState(state == State.NO_SCHEMA || state == State.NEW_SCHEMA ||
                              state == State.BATCH_ACTIVE,
         "Can only start input while in an output batch");
     Preconditions.checkState(!isCopyPending(),
         "Finish the pending copy before changing input");
 
-    bindInput();
+    if (!resultSetReader.next()) {
+      state = State.END_OF_INPUT;
+      return false;
+    }
+    rowReader = resultSetReader.reader();
 
     if (state == State.BATCH_ACTIVE) {
 
       // If no schema change, we are ready to copy.
-
-      if (currentSchemaVersion == resultSetReader.inputBatch().schemaVersion()) {
-        return;
+      if (currentSchemaVersion == resultSetReader.schemaVersion()) {
+        return true;
       }
 
       // The schema has changed. Handle it now or later.
-
       if (hasOutputRows()) {
 
         // Output batch has rows. Can't switch and bind inputs
         // until current batch is sent downstream.
-
         state = State.NEW_SCHEMA;
-        return;
+        return true;
       }
     }
 
     // The schema changed: first schema, or a change while a bath
     // is active, but is empty.
-
     if (state == State.NO_SCHEMA) {
       state = State.BATCH_ACTIVE;
     } else {
 
       // Discard the unused empty batch
-
       harvest().zeroVectors();
     }
     createProjection();
     resultSetWriter.startBatch();
 
     // Stay in the current state.
-  }
-
-  protected void bindInput() {
-    resultSetReader.start();
-    rowReader = resultSetReader.reader();
-  }
-
-  @Override
-  public void releaseInputBatch() {
-    Preconditions.checkState(state != State.CLOSED);
-    resultSetReader.release();
+    return true;
   }
 
   private void createProjection() {
@@ -190,14 +178,13 @@
       // will tear down the whole show. But, the vector cache will
       // ensure that the new writer reuses any matching vectors from
       // the prior batch to provide vector persistence as Drill expects.
-
       resultSetWriter.close();
     }
-    TupleMetadata schema = MetadataUtils.fromFields(resultSetReader.inputBatch().schema());
+    TupleMetadata schema = resultSetReader.schema();
     writerOptions.readerSchema(schema);
     resultSetWriter = new ResultSetLoaderImpl(allocator, writerOptions.build());
     rowWriter = resultSetWriter.writer();
-    currentSchemaVersion = resultSetReader.inputBatch().schemaVersion();
+    currentSchemaVersion = resultSetReader.schemaVersion();
 
     int colCount = schema.size();
     projection = new CopyPair[colCount];
@@ -225,6 +212,7 @@
     case BATCH_ACTIVE:
       return rowWriter.isFull();
     case NEW_SCHEMA:
+    case END_OF_INPUT:
       return true;
     default:
       return false;
@@ -288,7 +276,8 @@
 
   @Override
   public VectorContainer harvest() {
-    Preconditions.checkState(state == State.BATCH_ACTIVE || state == State.NEW_SCHEMA);
+    Preconditions.checkState(state == State.BATCH_ACTIVE || state == State.NEW_SCHEMA ||
+                             state == State.END_OF_INPUT);
     VectorContainer output = resultSetWriter.harvest();
     state = (state == State.BATCH_ACTIVE)
         ? State.BETWEEN_BATCHES : State.SCHEMA_PENDING;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
deleted file mode 100644
index 6046c97..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetReaderImpl.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.resultSet.impl;
-
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
-import org.apache.drill.exec.physical.resultSet.ResultSetReader;
-import org.apache.drill.exec.physical.rowSet.RowSetReader;
-import org.apache.drill.exec.physical.rowSet.RowSets;
-import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-public class ResultSetReaderImpl implements ResultSetReader {
-
-  @VisibleForTesting
-  protected enum State {
-      START,
-      BATCH,
-      DETACHED,
-      CLOSED
-  }
-
-  private State state = State.START;
-  private int priorSchemaVersion;
-  private final BatchAccessor batch;
-  private RowSetReader rowSetReader;
-
-  public ResultSetReaderImpl(BatchAccessor batch) {
-    this.batch = batch;
-  }
-
-  @Override
-  public void start() {
-    Preconditions.checkState(state != State.CLOSED, "Reader is closed");
-    Preconditions.checkState(state != State.BATCH,
-        "Call detach/release before starting another batch");
-    Preconditions.checkState(state == State.START ||
-        priorSchemaVersion <= batch.schemaVersion());
-    boolean newSchema = state == State.START ||
-        priorSchemaVersion != batch.schemaVersion();
-    state = State.BATCH;
-
-    // If new schema, discard the old reader (if any, and create
-    // a new one that matches the new schema. If not a new schema,
-    // then the old reader is reused: it points to vectors which
-    // Drill requires be the same vectors as the previous batch,
-    // but with different buffers.
-
-    if (newSchema) {
-      rowSetReader = RowSets.wrap(batch).reader();
-      priorSchemaVersion = batch.schemaVersion();
-    } else {
-      rowSetReader.newBatch();
-    }
-  }
-
-  @Override
-  public RowSetReader reader() {
-    Preconditions.checkState(state == State.BATCH, "Call start() before requesting the reader.");
-    return rowSetReader;
-  }
-
-  @Override
-  public void detach() {
-    if (state != State.START) {
-      Preconditions.checkState(state == State.BATCH || state == State.DETACHED);
-      state = State.DETACHED;
-    }
-  }
-
-  @Override
-  public void release() {
-    if (state != State.START && state != State.DETACHED) {
-      detach();
-      batch.release();
-    }
-  }
-
-  @Override
-  public void close() {
-    if (state != State.CLOSED) {
-      release();
-      state = State.CLOSED;
-    }
-  }
-
-  @VisibleForTesting
-  protected State state() { return state; }
-
-  @Override
-  public BatchAccessor inputBatch() { return batch; }
-}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
index 13780de..8404dc4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java
@@ -24,12 +24,11 @@
 
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
-import org.apache.drill.exec.physical.impl.protocol.IndirectContainerAccessor;
-import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
 import org.apache.drill.exec.physical.resultSet.ResultSetCopier;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
 import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -39,6 +38,7 @@
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector2Builder;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
@@ -55,77 +55,95 @@
         .add("name", MinorType.VARCHAR)
         .build();
 
-  private static class BaseDataGen {
-    protected final TupleMetadata schema;
+  private static abstract class BaseDataGen implements UpstreamSource {
+    protected int schemaVersion = 1;
     protected final ResultSetLoader rsLoader;
-    protected final VectorContainerAccessor batch = new VectorContainerAccessor();
+    protected VectorContainer batch;
+    protected int batchCount;
+    protected int rowCount;
+    protected int batchSize;
+    protected int batchLimit;
 
-    public BaseDataGen(TupleMetadata schema) {
-      this.schema = schema;
+    public BaseDataGen(TupleMetadata schema, int batchSize, int batchLimit) {
       ResultSetOptions options = new ResultSetOptionBuilder()
           .readerSchema(schema)
           .vectorCache(new ResultVectorCacheImpl(fixture.allocator()))
           .build();
       rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+      this.batchSize = batchSize;
+      this.batchLimit = batchLimit;
     }
 
-    public TupleMetadata schema() { return schema; }
+    @Override
+    public int schemaVersion() { return schemaVersion; }
 
-    public BatchAccessor batchAccessor() {
-      return batch;
+    @Override
+    public VectorContainer batch() { return batch; }
+
+    @Override
+    public boolean next() {
+      if (batchCount >= batchLimit) {
+        return false;
+      }
+      makeBatch();
+      return true;
+    }
+
+    protected abstract void makeBatch();
+
+    @Override
+    public SelectionVector2 sv2() { return null; }
+
+    @Override
+    public void release() {
+      if (batch != null) {
+        batch.zeroVectors();
+      }
+      SelectionVector2 sv2 = sv2();
+      if (sv2 != null) {
+        sv2.clear();
+      }
     }
   }
 
   private static class DataGen extends BaseDataGen {
 
     public DataGen() {
-      super(TEST_SCHEMA);
+      this(3, 1);
+     }
+
+    public DataGen(int batchSize, int batchLimit) {
+      super(TEST_SCHEMA, batchSize, batchLimit);
     }
 
-    public void makeBatch(int start, int end) {
+    @Override
+    protected void makeBatch() {
       rsLoader.startBatch();
-      for (int i = start; i <= end; i++) {
-        rsLoader.writer().addRow(i, "Row " + i);
+      for (int i = 0; i < batchSize; i++) {
+        rowCount++;
+        rsLoader.writer().addRow(rowCount, "Row " + rowCount);
       }
-      batch.addBatch(rsLoader.harvest());
-    }
-  }
-
-  public static class DataGen2 extends DataGen {
-    private final int batchCount = 2;
-    private final int batchSize = 5;
-    private int batchIndex;
-
-    boolean next() {
-      if (batchIndex >= batchCount) {
-        return false;
-      }
-      int start = nextRow();
-      makeBatch(start, start + batchSize - 1);
-      batchIndex++;
-      return true;
-    }
-
-    int nextRow() {
-      return batchIndex * batchSize + 1;
-    }
-
-    int targetRowCount( ) {
-      return batchCount * batchSize;
+      batch = rsLoader.harvest();
+      batchCount++;
     }
   }
 
   public static class SchemaChangeGen extends DataGen {
-    private int batchIndex;
-    public final int batchSize = 5;
-    private int schemaVersion = 1;
 
-    public void makeBatch2(int start, int end) {
-      rsLoader.startBatch();
-      for (int i = start; i <= end; i++) {
-        rsLoader.writer().addRow(i, "Row " + i, i * 10);
-      }
-      batch.addBatch(rsLoader.harvest());
+    int schema1Limit;
+
+    public SchemaChangeGen(int batchSize, int batchLimit, int schema1Limit) {
+      super(batchSize, batchLimit);
+      this.schema1Limit = schema1Limit;
+    }
+
+    public SchemaChangeGen(int schema1Limit) {
+      super(3, 3);
+      this.schema1Limit = schema1Limit;
+    }
+
+    public SchemaChangeGen() {
+      this(2);
     }
 
     public TupleMetadata schema2() {
@@ -136,21 +154,32 @@
           .build();
     }
 
+    @Override
+    protected void makeBatch() {
+      if (batchCount < schema1Limit) {
+        super.makeBatch();
+      } else if (batchCount == schema1Limit) {
+        evolveSchema();
+        makeBatch2();
+      } else {
+        makeBatch2();
+      }
+    }
+
+    public void makeBatch2() {
+      rsLoader.startBatch();
+      for (int i = 0; i < batchSize; i++) {
+        rowCount++;
+        rsLoader.writer().addRow(rowCount, "Row " + rowCount, rowCount * 10);
+      }
+      batch = rsLoader.harvest();
+      batchCount++;
+    }
+
     public void evolveSchema() {
       rsLoader.writer().addColumn(MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED));
       schemaVersion = 2;
     }
-
-    public void nextBatch() {
-      int start = batchIndex * batchSize + 1;
-      int end = start + batchSize - 1;
-      if (schemaVersion == 1) {
-        makeBatch(start, end);
-      } else {
-        makeBatch2(start, end);
-      }
-      batchIndex++;
-    }
   }
 
   private static class NullableGen extends BaseDataGen {
@@ -160,24 +189,28 @@
           .add("id", MinorType.INT)
           .addNullable("name", MinorType.VARCHAR)
           .addNullable("amount", MinorType.INT)
-          .build());
+          .build(),
+          10, 1);
     }
 
-    public void makeBatch(int start, int end) {
+    @Override
+    protected void makeBatch() {
       rsLoader.startBatch();
       RowSetLoader writer = rsLoader.writer();
-      for (int i = start; i <= end; i++) {
+      for (int i = 0; i < batchSize; i++) {
+        rowCount++;
         writer.start();
-        writer.scalar(0).setInt(i);
+        writer.scalar(0).setInt(rowCount);
         if (i % 2 == 0) {
-          writer.scalar(1).setString("Row " + i);
+          writer.scalar(1).setString("Row " + rowCount);
         }
         if (i % 3 == 0) {
-          writer.scalar(2).setInt(i * 10);
+          writer.scalar(2).setInt(rowCount * 10);
         }
         writer.save();
       }
-      batch.addBatch(rsLoader.harvest());
+      batch = rsLoader.harvest();
+      batchCount++;
     }
   }
 
@@ -187,23 +220,27 @@
       super(new SchemaBuilder()
           .add("id", MinorType.INT)
           .addArray("name", MinorType.VARCHAR)
-          .build());
+          .build(),
+          3, 1);
     }
 
-    public void makeBatch(int start, int end) {
+    @Override
+    protected void makeBatch() {
       rsLoader.startBatch();
       RowSetLoader writer = rsLoader.writer();
       ArrayWriter aw = writer.array(1);
-      for (int i = start; i <= end; i++) {
+      for (int i = 0; i < batchSize; i++) {
+        rowCount++;
         writer.start();
-        writer.scalar(0).setInt(i);
+        writer.scalar(0).setInt(rowCount);
         int n = i % 3;
         for (int j = 0; j < n; j++) {
-          aw.scalar().setString("Row " + i + "." + j);
+          aw.scalar().setString("Row " + rowCount + "." + j);
         }
         writer.save();
       }
-      batch.addBatch(rsLoader.harvest());
+      batch = rsLoader.harvest();
+      batchCount++;
     }
   }
 
@@ -216,38 +253,80 @@
             .add("name", MinorType.VARCHAR)
             .add("amount", MinorType.INT)
             .resumeSchema()
-          .build());
+          .build(),
+          3, 1);
     }
 
-    public void makeBatch(int start, int end) {
+    @Override
+    protected void makeBatch() {
       rsLoader.startBatch();
       RowSetLoader writer = rsLoader.writer();
       ArrayWriter aw = writer.array(1);
       TupleWriter mw = aw.entry().tuple();
-      for (int i = start; i <= end; i++) {
+      for (int i = 0; i < batchSize; i++) {
+        rowCount++;
         writer.start();
-        writer.scalar(0).setInt(i);
+        writer.scalar(0).setInt(rowCount);
         int n = i % 3;
         for (int j = 0; j < n; j++) {
-          mw.scalar(0).setString("Row " + i + "." + j);
-          mw.scalar(1).setInt(i * 100 + j);
+          mw.scalar(0).setString("Row " + rowCount + "." + j);
+          mw.scalar(1).setInt(rowCount * 100 + j);
           aw.save();
         }
         writer.save();
       }
-      batch.addBatch(rsLoader.harvest());
+      batch = rsLoader.harvest();
+      batchCount++;
     }
   }
 
+  public static class FilteredGen extends DataGen {
+
+    SelectionVector2 sv2;
+
+    public FilteredGen() {
+      super(10, 1);
+    }
+
+    @Override
+    protected void makeBatch() {
+      super.makeBatch();
+      makeSv2();
+    }
+
+    // Pick out every other record, in descending
+    // order.
+    private void makeSv2() {
+      SelectionVector2Builder sv2Builder =
+          new SelectionVector2Builder(fixture.allocator(), batch.getRecordCount());
+      for (int i = 0; i < 5; i++) {
+        sv2Builder.setNext(10 - 2 * i - 1);
+      }
+      sv2 =  sv2Builder.harvest(batch);
+      batch.buildSchema(SelectionVectorMode.TWO_BYTE);
+    }
+
+    @Override
+    public SelectionVector2 sv2() { return sv2; }
+  }
+
+  private ResultSetCopierImpl newCopier(UpstreamSource source) {
+    PullResultSetReader reader = new PullResultSetReaderImpl(source);
+    return new ResultSetCopierImpl(fixture.allocator(), reader);
+  }
+
+  private ResultSetCopierImpl newCopier(UpstreamSource source, ResultSetOptionBuilder outputOptions) {
+    PullResultSetReader reader = new PullResultSetReaderImpl(source);
+    return new ResultSetCopierImpl(fixture.allocator(), reader, outputOptions);
+  }
+
   @Test
   public void testBasics() {
 
     DataGen dataGen = new DataGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(dataGen);
 
     // Nothing should work yet
-
     try {
       copier.copyAllRows();
       fail();
@@ -262,28 +341,23 @@
     }
 
     // Predicates should work
-
     assertFalse(copier.isCopyPending());
     assertFalse(copier.hasOutputRows());
     assertFalse(copier.isOutputFull());
 
     // Define a schema and start an output batch.
-
     copier.startOutputBatch();
     assertFalse(copier.isCopyPending());
     assertFalse(copier.hasOutputRows());
     assertFalse(copier.isOutputFull());
 
-    // Provide an input row
-
-    dataGen.makeBatch(1, 3);
-    copier.startInputBatch();
+    // Provide an input batch
+    assertTrue(copier.nextInputBatch());
     assertFalse(copier.isCopyPending());
     assertFalse(copier.hasOutputRows());
     assertFalse(copier.isOutputFull());
 
     // Now can do some actual copying
-
     while (copier.copyNextRow()) {
       // empty
     }
@@ -294,72 +368,62 @@
     // Get and verify the output batch
     // (Does not free the input batch, we reuse it
     // in the verify step below.)
-
     RowSet result = fixture.wrap(copier.harvest());
-    new RowSetComparison(fixture.wrap(dataGen.batchAccessor().container()))
+    new RowSetComparison(fixture.wrap(dataGen.batch()))
       .verifyAndClear(result);
 
-    // Copier will release the input batch
+    // No more input
+    copier.startOutputBatch();
+    assertFalse(copier.nextInputBatch());
 
+    // OK to try multiple times
+    assertFalse(copier.nextInputBatch());
+
+    // Copier will release the input batch
     copier.close();
   }
 
   @Test
   public void testImmediateClose() {
 
-    DataGen dataGen = new DataGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new DataGen());
 
     // Close OK before things got started
-
     copier.close();
 
     // Second close is benign
-
     copier.close();
   }
 
   @Test
   public void testCloseBeforeSchema() {
 
-    DataGen dataGen = new DataGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new DataGen());
 
     // Start batch, no data yet.
-
     copier.startOutputBatch();
 
     // Close OK before things data arrives
-
     copier.close();
 
     // Second close is benign
-
     copier.close();
   }
 
   @Test
   public void testCloseWithData() {
 
-    DataGen dataGen = new DataGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new DataGen());
 
     // Start batch, with data.
-
     copier.startOutputBatch();
-    dataGen.makeBatch(1, 3);
-    copier.startInputBatch();
+    copier.nextInputBatch();
     copier.copyNextRow();
 
     // Close OK with input and output batch allocated.
-
     copier.close();
 
     // Second close is benign
-
     copier.close();
   }
 
@@ -371,27 +435,25 @@
    * This copier does not support merging from multiple
    * streams.
    */
-
   @Test
   public void testMerge() {
-    DataGen dataGen = new DataGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new DataGen(3, 5));
     copier.startOutputBatch();
 
     for (int i = 0; i < 5; i++) {
-      int start = i * 3 + 1;
-      dataGen.makeBatch(start, start + 2);
-      copier.startInputBatch();
+      assertTrue(copier.nextInputBatch());
       assertFalse(copier.isOutputFull());
       copier.copyAllRows();
-      copier.releaseInputBatch();
       assertFalse(copier.isOutputFull());
       assertFalse(copier.isCopyPending());
     }
+    assertFalse(copier.nextInputBatch());
     RowSet result = fixture.wrap(copier.harvest());
-    dataGen.makeBatch(1, 15);
-    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+
+    // Verify with single batch with all rows
+    DataGen dataGen = new DataGen(15, 1);
+    dataGen.next();
+    RowSet expected = RowSets.wrap(dataGen.batch());
     RowSetUtilities.verify(expected, result);
 
     copier.close();
@@ -399,81 +461,67 @@
 
   @Test
   public void testMultiOutput() {
-    DataGen2 dataGen = new DataGen2();
-    DataGen validatorGen = new DataGen();
 
     // Equivalent of operator start() method.
-
+    DataGen dataGen = new DataGen(15, 2);
     ResultSetOptionBuilder options = new ResultSetOptionBuilder()
         .rowCountLimit(12);
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor(), options);
+    ResultSetCopier copier = newCopier(dataGen, options);
 
     // Equivalent of an entire operator run
-
-    int start = 1;
+    DataGen validatorGen = new DataGen(12, 2);
+    int outputCount = 0;
     while (true) {
 
       // Equivalent of operator next() method
-
       copier.startOutputBatch();
       while (! copier.isOutputFull()) {
-        copier.releaseInputBatch();
-        if (! dataGen.next()) {
+        if (!copier.nextInputBatch()) {
           break;
         }
-        copier.startInputBatch();
         copier.copyAllRows();
       }
-      if (! copier.hasOutputRows()) {
+      if (!copier.hasOutputRows()) {
         break;
       }
 
       // Equivalent of sending downstream
-
       RowSet result = fixture.wrap(copier.harvest());
-       int nextRow = dataGen.nextRow();
-      validatorGen.makeBatch(start, nextRow - 1);
-      RowSet expected = RowSets.wrap(validatorGen.batchAccessor());
-      RowSetUtilities.verify(expected, result);
-      start = nextRow;
+
+      validatorGen.next();
+      RowSet expected = RowSets.wrap(validatorGen.batch());
+      RowSetUtilities.verify(expected, result, result.rowCount());
+      outputCount++;
     }
 
     // Ensure more than one output batch.
-
-    assertTrue(start > 1);
+    assertTrue(outputCount > 1);
 
     // Ensure all rows generated.
-
-    assertEquals(dataGen.targetRowCount(), start - 1);
+    assertEquals(30, dataGen.rowCount);
 
     // Simulate operator close();
-
     copier.close();
   }
 
   @Test
   public void testCopyRecord() {
-    DataGen dataGen = new DataGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new DataGen(3, 2));
     copier.startOutputBatch();
 
-    dataGen.makeBatch(1, 3);
-    copier.startInputBatch();
+    copier.nextInputBatch();
     copier.copyRow(2);
     copier.copyRow(0);
     copier.copyRow(1);
-    copier.releaseInputBatch();
 
-    dataGen.makeBatch(4, 6);
-    copier.startInputBatch();
+    copier.nextInputBatch();
     copier.copyRow(1);
     copier.copyRow(0);
     copier.copyRow(2);
-    copier.releaseInputBatch();
 
-    RowSet expected = new RowSetBuilder(fixture.allocator(), dataGen.schema())
+    assertFalse(copier.nextInputBatch());
+
+    RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA)
         .addRow(3, "Row 3")
         .addRow(1, "Row 1")
         .addRow(2, "Row 2")
@@ -481,67 +529,49 @@
         .addRow(4, "Row 4")
         .addRow(6, "Row 6")
         .build();
-    RowSet result = fixture.wrap(copier.harvest());
-    RowSetUtilities.verify(expected, result);
+    RowSetUtilities.verify(expected, fixture.wrap(copier.harvest()));
 
     copier.close();
   }
 
   @Test
   public void testSchemaChange() {
-    SchemaChangeGen dataGen = new SchemaChangeGen();
-    SchemaChangeGen verifierGen = new SchemaChangeGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new SchemaChangeGen(3, 4, 2));
 
     // Copy first batch with first schema
-
     copier.startOutputBatch();
-    dataGen.nextBatch();
-    copier.startInputBatch();
+    assertTrue(copier.nextInputBatch());
     copier.copyAllRows();
     assertFalse(copier.isOutputFull());
 
     // Second, same schema
-
-    copier.releaseInputBatch();
-    dataGen.nextBatch();
-    copier.startInputBatch();
+    assertTrue(copier.nextInputBatch());
     copier.copyAllRows();
     assertFalse(copier.isOutputFull());
 
     // Plenty of room. But, change the schema.
-
-    copier.releaseInputBatch();
-    dataGen.evolveSchema();
-    dataGen.nextBatch();
-    copier.startInputBatch();
+    assertTrue(copier.nextInputBatch());
     assertTrue(copier.isOutputFull());
 
     // Must harvest partial output
-
     RowSet result = fixture.wrap(copier.harvest());
-    verifierGen.makeBatch(1, 2 * dataGen.batchSize - 1);
-    RowSet expected = RowSets.wrap(verifierGen.batchAccessor());
+    SchemaChangeGen verifierGen = new SchemaChangeGen(6, 2, 1);
+    verifierGen.next();
+    RowSet expected = RowSets.wrap(verifierGen.batch());
     RowSetUtilities.verify(expected, result);
 
     // Start a new batch, implicitly complete pending copy
-
     copier.startOutputBatch();
     copier.copyAllRows();
 
     // Add one more of second schema
-
-    copier.releaseInputBatch();
-    dataGen.nextBatch();
-    copier.startInputBatch();
+    assertTrue(copier.nextInputBatch());
     copier.copyAllRows();
     assertFalse(copier.isOutputFull());
 
     result = fixture.wrap(copier.harvest());
-    verifierGen.evolveSchema();
-    verifierGen.makeBatch2(2 * dataGen.batchSize + 1, 4 * dataGen.batchSize - 1);
-    expected = RowSets.wrap(verifierGen.batchAccessor());
+    verifierGen.next();
+    expected = RowSets.wrap(verifierGen.batch());
     RowSetUtilities.verify(expected, result);
     assertFalse(copier.isCopyPending());
 
@@ -553,31 +583,11 @@
 
   @Test
   public void testSV2() {
-    DataGen dataGen = new DataGen();
-    IndirectContainerAccessor filtered = new IndirectContainerAccessor();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), filtered);
+    ResultSetCopier copier = newCopier(new FilteredGen());
 
     copier.startOutputBatch();
-    dataGen.makeBatch(1, 10);
-
-    // Pick out every other record, in descending
-    // order.
-
-    VectorContainer container = dataGen.batchAccessor().container();
-    SelectionVector2Builder sv2Builder =
-        new SelectionVector2Builder(fixture.allocator(), container.getRecordCount());
-    for (int i = 0; i < 5; i++) {
-      sv2Builder.setNext(10 - 2 * i - 1);
-    }
-    container.buildSchema(SelectionVectorMode.TWO_BYTE);
-    filtered.addBatch(container);
-    filtered.setSelectionVector(sv2Builder.harvest(container));
-    assertEquals(5, filtered.rowCount());
-
-    copier.startInputBatch();
+    assertTrue(copier.nextInputBatch());
     copier.copyAllRows();
-    copier.releaseInputBatch();
 
     RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA)
         .addRow(10, "Row 10")
@@ -599,17 +609,16 @@
 
   @Test
   public void testNullable() {
-    NullableGen dataGen = new NullableGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new NullableGen());
     copier.startOutputBatch();
 
-    dataGen.makeBatch(1, 10);
-    copier.startInputBatch();
+    copier.nextInputBatch();
     copier.copyAllRows();
 
     RowSet result = fixture.wrap(copier.harvest());
-    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+    NullableGen verifierGen = new NullableGen();
+    verifierGen.next();
+    RowSet expected = RowSets.wrap(verifierGen.batch());
     RowSetUtilities.verify(expected, result);
 
     copier.close();
@@ -617,17 +626,16 @@
 
   @Test
   public void testArrays() {
-    ArrayGen dataGen = new ArrayGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new ArrayGen());
     copier.startOutputBatch();
 
-    dataGen.makeBatch(1, 5);
-    copier.startInputBatch();
+    copier.nextInputBatch();
     copier.copyAllRows();
 
     RowSet result = fixture.wrap(copier.harvest());
-    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+    ArrayGen verifierGen = new ArrayGen();
+    verifierGen.next();
+    RowSet expected = RowSets.wrap(verifierGen.batch());
     RowSetUtilities.verify(expected, result);
 
     copier.close();
@@ -635,17 +643,16 @@
 
   @Test
   public void testMaps() {
-    MapGen dataGen = new MapGen();
-    ResultSetCopier copier = new ResultSetCopierImpl(
-        fixture.allocator(), dataGen.batchAccessor());
+    ResultSetCopier copier = newCopier(new MapGen());
     copier.startOutputBatch();
 
-    dataGen.makeBatch(1, 5);
-    copier.startInputBatch();
+    copier.nextInputBatch();
     copier.copyAllRows();
 
     RowSet result = fixture.wrap(copier.harvest());
-    RowSet expected = RowSets.wrap(dataGen.batchAccessor());
+    MapGen verifierGen = new MapGen();
+    verifierGen.next();
+    RowSet expected = RowSets.wrap(verifierGen.batch());
     RowSetUtilities.verify(expected, result);
 
     copier.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
index 05c5430..08ad5ac 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetReader.java
@@ -18,98 +18,140 @@
 package org.apache.drill.exec.physical.resultSet.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
-import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
+import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
-import org.apache.drill.exec.physical.resultSet.ResultSetReader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
 import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.physical.rowSet.RowSetReader;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
 
 public class TestResultSetReader extends SubOperatorTest {
 
-  public static class BatchGenerator {
+  private static final TupleMetadata SCHEMA1 = new SchemaBuilder()
+      .add("id", MinorType.INT)
+      .add("name", MinorType.VARCHAR)
+      .build();
+  private static final TupleMetadata SCHEMA2 = new SchemaBuilder()
+      .addAll(SCHEMA1)
+      .add("amount", MinorType.INT)
+      .build();
+
+  public static class BatchGenerator implements UpstreamSource {
 
     private enum State { SCHEMA1, SCHEMA2 };
 
     private final ResultSetLoader rsLoader;
-    private final VectorContainerAccessor batch = new VectorContainerAccessor();
+    private VectorContainer batch;
+    private int schemaVersion;
     private State state;
+    private int batchCount;
+    private int rowCount;
+    private final int schema1Count;
+    private final int schema2Count;
+    private final int batchSize;
 
-    public BatchGenerator() {
-      TupleMetadata schema1 = new SchemaBuilder()
-          .add("id", MinorType.INT)
-          .add("name", MinorType.VARCHAR)
-          .build();
+    public BatchGenerator(int batchSize, int schema1Count, int schema2Count) {
       ResultSetOptions options = new ResultSetOptionBuilder()
-          .readerSchema(schema1)
+          .readerSchema(SCHEMA1)
           .vectorCache(new ResultVectorCacheImpl(fixture.allocator()))
           .build();
-      rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
-      state = State.SCHEMA1;
+      this.rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+      this.state = State.SCHEMA1;
+      this.batchSize = batchSize;
+      this.schemaVersion = 1;
+      this.schema1Count = schema1Count;
+      this.schema2Count = schema2Count;
     }
 
-    public void batch1(int start, int end) {
+    public void batch1() {
       Preconditions.checkState(state == State.SCHEMA1);
       rsLoader.startBatch();
       RowSetLoader writer = rsLoader.writer();
-      for (int i = start; i <= end; i++) {
+      for (int i = 0; i < batchSize; i++) {
+        rowCount++;
         writer.start();
-        writer.scalar("id").setInt(i);
-        writer.scalar("name").setString("Row" + i);
+        writer.scalar("id").setInt(rowCount);
+        writer.scalar("name").setString("Row" + rowCount);
         writer.save();
       }
-      batch.addBatch(rsLoader.harvest());
+      batch = rsLoader.harvest();
+      batchCount++;
     }
 
-    public void batch2(int start, int end) {
+    public void batch2() {
       RowSetLoader writer = rsLoader.writer();
       if (state == State.SCHEMA1) {
-        ColumnMetadata balCol = MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED);
-        writer.addColumn(balCol);
+        writer.addColumn(SCHEMA2.metadata("amount"));
         state = State.SCHEMA2;
+        schemaVersion++;
       }
       rsLoader.startBatch();
-      for (int i = start; i <= end; i++) {
+      for (int i = 0; i < batchSize; i++) {
+        rowCount++;
         writer.start();
-        writer.scalar("id").setInt(i);
-        writer.scalar("name").setString("Row" + i);
-        writer.scalar("amount").setInt(i * 10);
+        writer.scalar("id").setInt(rowCount);
+        writer.scalar("name").setString("Row" + rowCount);
+        writer.scalar("amount").setInt(rowCount * 10);
         writer.save();
       }
-      batch.addBatch(rsLoader.harvest());
-    }
-
-    public BatchAccessor batchAccessor() {
-      return batch;
+      batch = rsLoader.harvest();
+      batchCount++;
     }
 
     public void close() {
       rsLoader.close();
     }
+
+    @Override
+    public boolean next() {
+      if (batchCount == schema1Count + schema2Count) {
+        return false;
+      }
+      if (batchCount < schema1Count) {
+        batch1();
+      } else {
+        batch2();
+      }
+      return true;
+    }
+
+    @Override
+    public int schemaVersion() { return schemaVersion; }
+
+    @Override
+    public VectorContainer batch() { return batch; }
+
+    @Override
+    public SelectionVector2 sv2() { return null; }
+
+    @Override
+    public void release() {
+      if (batch != null) {
+        batch.zeroVectors();
+      }
+    }
   }
 
   @Test
   public void testBasics() {
-    BatchGenerator gen = new BatchGenerator();
-    ResultSetReader rsReader = new ResultSetReaderImpl(gen.batchAccessor());
+    PullResultSetReader rsReader = new PullResultSetReaderImpl(
+        new BatchGenerator(10, 2, 1));
 
     // Start state
-
     try {
       rsReader.reader();
       fail();
@@ -117,16 +159,15 @@
       // Expected
     }
 
-    // OK to detach with no input
-    rsReader.detach();
-    rsReader.release();
+    // Ask for schema. Does an implicit next.
+    assertEquals(SCHEMA1, rsReader.schema());
+    assertEquals(1, rsReader.schemaVersion());
 
-    // Make a batch. Verify reader is attached.
+    // Move to the first batch.
     // (Don't need to do a full reader test, that is already done
     // elsewhere.)
-
-    gen.batch1(1, 10);
-    rsReader.start();
+    assertTrue(rsReader.next());
+    assertEquals(1, rsReader.schemaVersion());
     RowSetReader reader1;
     {
       RowSetReader reader = rsReader.reader();
@@ -135,18 +176,10 @@
       assertEquals(1, reader.scalar("id").getInt());
       assertEquals("Row1", reader.scalar("name").getString());
     }
-    rsReader.release();
-    try {
-      rsReader.reader();
-      fail();
-    } catch (IllegalStateException e) {
-      // Expected
-    }
 
-    // Another batch of same schema
-
-    gen.batch1(11, 20);
-    rsReader.start();
+    // Second batch, same schema.
+    assertTrue(rsReader.next());
+    assertEquals(1, rsReader.schemaVersion());
     {
       RowSetReader reader = rsReader.reader();
       assertSame(reader1, reader);
@@ -155,12 +188,10 @@
       assertEquals(11, reader.scalar("id").getInt());
       assertEquals("Row11", reader.scalar("name").getString());
     }
-    rsReader.release();
 
     // Batch with new schema
-
-    gen.batch2(21, 30);
-    rsReader.start();
+    assertTrue(rsReader.next());
+    assertEquals(2, rsReader.schemaVersion());
     {
       RowSetReader reader = rsReader.reader();
       assertNotSame(reader1, reader);
@@ -170,23 +201,50 @@
       assertEquals("Row21", reader.scalar("name").getString());
       assertEquals(210, reader.scalar("amount").getInt());
     }
-    rsReader.release();
 
+    assertFalse(rsReader.next());
     rsReader.close();
   }
 
   @Test
   public void testCloseAtStart() {
-    BatchGenerator gen = new BatchGenerator();
-    ResultSetReaderImpl rsReader = new ResultSetReaderImpl(gen.batchAccessor());
+    PullResultSetReader rsReader = new PullResultSetReaderImpl(
+        new BatchGenerator(10, 2, 1));
 
     // Close OK in start state
-
     rsReader.close();
-    assertEquals(ResultSetReaderImpl.State.CLOSED, rsReader.state());
 
     // Second close OK
+    rsReader.close();
+  }
 
+  @Test
+  public void testCloseDuringRead() {
+    PullResultSetReader rsReader = new PullResultSetReaderImpl(
+        new BatchGenerator(10, 2, 1));
+
+    // Move to first batch
+    assertTrue(rsReader.next());
+
+    // Close OK in start state
+    rsReader.close();
+
+    // Second close OK
+    rsReader.close();
+  }
+
+  @Test
+  public void testCloseAfterNext() {
+    PullResultSetReader rsReader = new PullResultSetReaderImpl(
+        new BatchGenerator(10, 2, 1));
+
+    // Move to first batch
+    assertTrue(rsReader.next());
+
+    // Close OK in start state
+    rsReader.close();
+
+    // Second close OK
     rsReader.close();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index db1ab11..0458aee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -29,7 +29,6 @@
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.physical.rowSet.DirectRowSet;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
@@ -321,8 +320,7 @@
           RowSet result = client.queryBuilder().sql(sql).rowSet();
           assertEquals(4, result.rowCount());
           result.clear();
-        } catch (RpcException e) {
-          assertTrue(e.getCause() instanceof UserRemoteException);
+        } catch (UserRemoteException e) {
           sawError = true;
           break;
         }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
index 6acdc9c..c7ddd9e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockPlugin.java
@@ -29,8 +29,8 @@
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
-import org.apache.drill.test.QueryBuilder.QuerySummary;
 import org.apache.drill.test.QueryRowSetIterator;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetReader;
 import org.junit.BeforeClass;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
index 74a8e65..7618456 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
@@ -27,6 +27,8 @@
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Drill query event listener that buffers rows into a producer-consumer
@@ -36,13 +38,10 @@
  * Query messages are transformed into events: query ID, batch,
  * EOF or error.
  */
+public class BufferingQueryEventListener implements UserResultsListener {
+  private static final Logger logger = LoggerFactory.getLogger(BufferingQueryEventListener.class);
 
-public class BufferingQueryEventListener implements UserResultsListener
-{
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferingQueryEventListener.class);
-
-  public static class QueryEvent
-  {
+  public static class QueryEvent {
     public enum Type { QUERY_ID, BATCH, EOF, ERROR }
 
     public final Type type;
@@ -72,7 +71,7 @@
     }
   }
 
-  private BlockingQueue<QueryEvent> queue = Queues.newLinkedBlockingQueue();
+  private final BlockingQueue<QueryEvent> queue = Queues.newLinkedBlockingQueue();
 
   @Override
   public void queryIdArrived(QueryId queryId) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
index df3a217..c898e0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
@@ -18,8 +18,9 @@
 package org.apache.drill.test;
 
 import ch.qos.logback.classic.Level;
-import org.apache.drill.exec.client.LoggingResultsListener;
+
 import org.apache.drill.common.util.function.CheckedSupplier;
+import org.apache.drill.exec.client.LoggingResultsListener;
 import org.apache.drill.exec.util.VectorUtil;
 
 import java.util.function.Supplier;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
index 8577448..e4dcd98 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
@@ -39,6 +39,8 @@
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Parses a query profile and provides access to various bits of the profile
@@ -46,7 +48,7 @@
  */
 
 public class ProfileParser {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileParser.class);
+  private static final Logger logger = LoggerFactory.getLogger(ProfileParser.class);
 
   /**
    * The original JSON profile.
@@ -394,7 +396,7 @@
           logger.info("Can't find operator def: {}-{}", major.id, op.opId);
           continue;
         }
-        op.opName = CoreOperatorType.valueOf(op.type).name();
+        op.opName = CoreOperatorType.forNumber(op.type).name();
         op.opName = op.opName.replace("_", " ");
         op.name = opDef.name;
         if (op.name.equalsIgnoreCase(op.opName)) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
new file mode 100644
index 0000000..2bace96
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+
+/**
+ * Iterator over batches returned from a query. Uses a listener to obtain
+ * the serialized batches, then decodes them into value vectors held
+ * in a vector container - the same structure as used in the query
+ * executor. This format allows us to use the "row set" classes on the
+ * output of the query.
+ */
+public class QueryBatchIterator implements UpstreamSource, AutoCloseable {
+
+  private enum State { START, RUN, RETAIN, EOF }
+
+  private final BufferingQueryEventListener listener;
+  private final RecordBatchLoader loader;
+  private State state = State.START;
+  private boolean retainData;
+  private QueryId queryId;
+  private QueryState queryState;
+  private int schemaVersion;
+  private int recordCount;
+  private int batchCount;
+
+  public QueryBatchIterator(BufferAllocator allocator, BufferingQueryEventListener listener) {
+    this.listener = listener;
+    this.loader = new RecordBatchLoader(allocator);
+  }
+
+  public QueryId queryId() { return queryId; }
+  public String queryIdString() { return QueryIdHelper.getQueryId(queryId); }
+  public QueryState finalState() { return queryState; }
+  public int batchCount() { return batchCount; }
+  public int rowCount() { return recordCount; }
+
+  @Override
+  public boolean next() {
+    retainData = false;
+    if (state == State.EOF) {
+      return false;
+    }
+    while (true) {
+      QueryEvent event = listener.get();
+      queryState = event.state;
+      switch (event.type) {
+        case BATCH:
+
+          // Skip over null batches
+          if (loadBatch(event)) {
+            return true;
+          }
+          break;
+        case EOF:
+          state = State.EOF;
+          return false;
+        case ERROR:
+          state = State.EOF;
+          if (event.error instanceof UserException) {
+            throw (UserException) event.error;
+          } else {
+            throw new RuntimeException(event.error);
+          }
+        case QUERY_ID:
+          queryId = event.queryId;
+          break;
+        default:
+          throw new IllegalStateException("Unexpected event: " + event.type);
+      }
+    }
+  }
+
+  private boolean loadBatch(QueryEvent event) {
+    batchCount++;
+    recordCount += event.batch.getHeader().getRowCount();
+    QueryDataBatch inputBatch = Preconditions.checkNotNull(event.batch);
+
+    // Unload the batch and convert to a row set.
+    loader.load(inputBatch.getHeader().getDef(), inputBatch.getData());
+    inputBatch.release();
+    VectorContainer batch = loader.getContainer();
+    batch.setRecordCount(loader.getRecordCount());
+
+    // Null results? Drill will return a single batch with no rows
+    // and no columns even if the scan (or other) operator returns
+    // no batches at all. For ease of testing, simply map this null
+    // result set to a null output row set that says "nothing at all
+    // was returned." Note that this is different than an empty result
+    // set which has a schema, but no rows.
+    if (batch.getRecordCount() == 0 && batch.getNumberOfColumns() == 0) {
+      release();
+      return false;
+    }
+
+    if (state == State.START || batch.isSchemaChanged()) {
+      schemaVersion++;
+    }
+    state = State.RUN;
+    return true;
+  }
+
+  @Override
+  public int schemaVersion() { return schemaVersion; }
+
+  @Override
+  public VectorContainer batch() { return loader.getContainer(); }
+
+  @Override
+  public SelectionVector2 sv2() { return null; }
+
+  @Override
+  public void release() {
+    loader.clear();
+  }
+
+  public void retainData() {
+    retainData = true;
+  }
+
+  @Override
+  public void close() {
+    if (!retainData) {
+      release();
+    }
+
+    // Consume any pending input
+    while (state != State.EOF) {
+      QueryEvent event = listener.get();
+      if (event.type == QueryEvent.Type.EOF) {
+        state = State.EOF;
+      }
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 4828f6c..e451dbb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.test;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -58,11 +60,8 @@
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
-import org.apache.drill.test.ClientFixture.StatementParser;
 import org.joda.time.Period;
 
-import static org.junit.Assert.assertEquals;
-
 /**
  * Builder for a Drill query. Provides all types of query formats,
  * and a variety of ways to run the query.
@@ -121,21 +120,18 @@
    * The future used to wait for the completion of an async query. Returns
    * just the summary of the query.
    */
-
   public static class QuerySummaryFuture implements Future<QuerySummary> {
 
     /**
      * Synchronizes the listener thread and the test thread that
      * launched the query.
      */
-
     private final CountDownLatch lock = new CountDownLatch(1);
     private QuerySummary summary;
 
     /**
      * Unsupported at present.
      */
-
     @Override
     public boolean cancel(boolean mayInterruptIfRunning) {
       throw new UnsupportedOperationException();
@@ -144,7 +140,6 @@
     /**
      * Always returns false.
      */
-
     @Override
     public boolean isCancelled() { return false; }
 
@@ -160,7 +155,6 @@
     /**
      * Not supported at present, just does a non-timeout get.
      */
-
     @Override
     public QuerySummary get(long timeout, TimeUnit unit) throws InterruptedException {
       return get();
@@ -345,56 +339,32 @@
    */
   public DirectRowSet rowSet() throws RpcException {
 
-    // Ignore all but the first non-empty batch.
-    // Always return the last batch, which may be empty.
-
-    QueryDataBatch resultBatch = null;
-    for (QueryDataBatch batch : results()) {
-      if (resultBatch == null) {
-        resultBatch = batch;
-      } else if (resultBatch.getHeader().getRowCount() == 0) {
-        resultBatch.release();
-        resultBatch = batch;
-      } else if (batch.getHeader().getRowCount() > 0) {
-        throw new IllegalStateException("rowSet() returns a single batch, but this query returned multiple batches. Consider rowSetIterator() instead.");
-      } else {
-        batch.release();
+    VectorContainer batch = null;
+    try (QueryBatchIterator iter = new QueryBatchIterator(client.allocator(), withEventListener())) {
+      while (iter.next()) {
+        batch = iter.batch();
+        if (batch.getRecordCount() != 0) {
+          iter.retainData();
+          break;
+        }
       }
+      iter.retainData();
     }
-
-    // No results?
-
-    if (resultBatch == null) {
+    if (batch == null) {
       return null;
+    } else {
+      return DirectRowSet.fromContainer(batch);
     }
-
-    // Unload the batch and convert to a row set.
-
-    RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
-    loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
-    resultBatch.release();
-    VectorContainer container = loader.getContainer();
-    container.setRecordCount(loader.getRecordCount());
-
-    // Null results? Drill will return a single batch with no rows
-    // and no columns even if the scan (or other) operator returns
-    // no batches at all. For ease of testing, simply map this null
-    // result set to a null output row set that says "nothing at all
-    // was returned." Note that this is different than an empty result
-    // set which has a schema, but no rows.
-
-    if (container.getRecordCount() == 0 && container.getNumberOfColumns() == 0) {
-      container.clear();
-      return null;
-    }
-
-    return DirectRowSet.fromContainer(container);
   }
 
   public QueryRowSetIterator rowSetIterator() {
     return new QueryRowSetIterator(client.allocator(), withEventListener());
   }
 
+  public QueryRowSetReader rowSetReader() {
+    return QueryRowSetReader.build(client.allocator(), withEventListener());
+  }
+
   /**
    * Run the query which expect to return vector {@code V} representation
    * of type {@code T} for the column {@code columnName}.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
index 3a3fe67..d8e2da1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
@@ -30,15 +30,14 @@
  * a very easy way for tests to work with query data using the
  * row set tools.
  */
-
 public class QueryResultSet {
   private final BufferingQueryEventListener listener;
   private boolean eof;
-  private int recordCount = 0;
-  private int batchCount = 0;
-  private QueryId queryId = null;
+  private int recordCount;
+  private int batchCount;
+  private QueryId queryId;
   @SuppressWarnings("unused")
-  private QueryState state = null;
+  private QueryState state;
   final RecordBatchLoader loader;
 
   public QueryResultSet(BufferingQueryEventListener listener, BufferAllocator allocator) {
@@ -53,7 +52,6 @@
    * @return the next batch as a row set, or null if EOF
    * @throws Exception on a server error
    */
-
   public DirectRowSet next() throws Exception {
     if (eof) {
       return null;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
index acc5056..37e7aeb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java
@@ -24,74 +24,32 @@
 import org.apache.drill.exec.physical.rowSet.RowSetFormatter;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
 
+/**
+ * Converts an incoming set of record batches into an iterator over a
+ * set of row sets. Primarily for testing.
+ */
 public class QueryRowSetIterator implements Iterator<DirectRowSet>, Iterable<DirectRowSet> {
-  private final BufferingQueryEventListener listener;
-  private final BufferAllocator allocator;
-  private int recordCount;
-  private int batchCount;
-  private QueryId queryId;
-  private QueryDataBatch batch;
-  private QueryState state;
+  private final QueryBatchIterator batchIter;
 
-  QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener listener) {
-    this.allocator = allocator;
-    this.listener = listener;
+  public QueryRowSetIterator(BufferAllocator allocator, BufferingQueryEventListener listener) {
+    batchIter = new QueryBatchIterator(allocator, listener);
   }
 
-  public QueryId queryId() { return queryId; }
-  public String queryIdString() { return QueryIdHelper.getQueryId(queryId); }
-  public QueryState finalState() { return state; }
-  public int batchCount() { return batchCount; }
-  public int rowCount() { return recordCount; }
+  public QueryId queryId() { return batchIter.queryId(); }
+  public String queryIdString() { return batchIter.queryIdString(); }
+  public QueryState finalState() { return batchIter.finalState(); }
+  public int batchCount() { return batchIter.batchCount(); }
+  public int rowCount() { return batchIter.rowCount(); }
 
   @Override
   public boolean hasNext() {
-    while (true) {
-      QueryEvent event = listener.get();
-      state = event.state;
-      batch = null;
-      switch (event.type) {
-      case BATCH:
-        batchCount++;
-        recordCount += event.batch.getHeader().getRowCount();
-        batch = event.batch;
-        return true;
-      case EOF:
-        state = event.state;
-        return false;
-      case ERROR:
-        throw new RuntimeException(event.error);
-      case QUERY_ID:
-        queryId = event.queryId;
-        break;
-      default:
-        throw new IllegalStateException("Unexpected event: " + event.type);
-      }
-    }
+    return batchIter.next();
   }
 
   @Override
   public DirectRowSet next() {
-
-    if (batch == null) {
-      throw new IllegalStateException();
-    }
-
-    // Unload the batch and convert to a row set.
-
-    final RecordBatchLoader loader = new RecordBatchLoader(allocator);
-    loader.load(batch.getHeader().getDef(), batch.getData());
-    batch.release();
-    batch = null;
-    VectorContainer container = loader.getContainer();
-    container.setRecordCount(loader.getRecordCount());
-    return DirectRowSet.fromContainer(container);
+    return DirectRowSet.fromContainer(batchIter.batch());
   }
 
   public void printAll() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java
new file mode 100644
index 0000000..8703f28
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+public class QueryRowSetReader extends PullResultSetReaderImpl {
+
+  private final QueryBatchIterator batchIter;
+
+  public QueryRowSetReader(QueryBatchIterator batchIter) {
+    super(batchIter);
+    this.batchIter = batchIter;
+  }
+
+  public static QueryRowSetReader build(BufferAllocator allocator, BufferingQueryEventListener listener) {
+    return new QueryRowSetReader(new QueryBatchIterator(allocator, listener));
+  }
+
+  public QueryId queryId() { return batchIter.queryId(); }
+  public String queryIdString() { return batchIter.queryIdString(); }
+
+  @Override
+  public void close() {
+    batchIter.close();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java
new file mode 100644
index 0000000..70c4260
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/StatementParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+/**
+ * Very simple parser for semi-colon separated lists of SQL statements which
+ * handles quoted semicolons. Drill can execute only one statement at a time
+ * (without a trailing semi-colon.) This parser breaks up a statement list
+ * into single statements. Input:<code><pre>
+ * USE a.b;
+ * ALTER SESSION SET `foo` = ";";
+ * SELECT * FROM bar WHERE x = "\";";
+ * </pre><code>Output:
+ * <ul>
+ * <li><tt>USE a.b</tt></li>
+ * <li><tt>ALTER SESSION SET `foo` = ";"</tt></li>
+ * <li><tt>SELECT * FROM bar WHERE x = "\";"</tt></li>
+ */
+public class StatementParser {
+  private final Reader in;
+
+  public StatementParser(Reader in) {
+    this.in = in;
+  }
+
+  public StatementParser(String text) {
+    this(new StringReader(text));
+  }
+
+  public String parseNext() throws IOException {
+    boolean eof = false;
+    StringBuilder buf = new StringBuilder();
+    while (true) {
+      int c = in.read();
+      if (c == -1) {
+        eof = true;
+        break;
+      }
+      if (c == ';') {
+        break;
+      }
+      buf.append((char) c);
+      if (c == '"' || c == '\'' || c == '`') {
+        int quote = c;
+        boolean escape = false;
+        while (true) {
+          c = in.read();
+          if (c == -1) {
+            throw new IllegalArgumentException("Mismatched quote: " + (char) c);
+          }
+          buf.append((char) c);
+          if (! escape && c == quote) {
+            break;
+          }
+          escape = c == '\\';
+        }
+      }
+    }
+    String stmt = buf.toString().trim();
+    if (stmt.isEmpty() && eof) {
+      return null;
+    }
+    return stmt;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
index 334e90d..925130f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetUtilities.java
@@ -21,6 +21,7 @@
 import static org.junit.Assert.assertTrue;
 
 import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -35,7 +36,6 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
-import org.bouncycastle.util.Arrays;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.LocalDate;
@@ -45,7 +45,6 @@
 /**
  * Various utilities useful for working with row sets, especially for testing.
  */
-
 public class RowSetUtilities {
 
   private RowSetUtilities() { }
@@ -55,7 +54,6 @@
    * and easy way to reverse the sort order of an expected-value row set.
    * @param sv2 the SV2 which is reversed in place
    */
-
   public static void reverse(SelectionVector2 sv2) {
     int count = sv2.getCount();
     for (int i = 0; i < count / 2; i++) {
@@ -164,7 +162,7 @@
         byte[] expected = (byte[]) expectedObj;
         byte[] actual = (byte[]) actualObj;
         assertEquals(msg + " - byte lengths differ", expected.length, actual.length);
-        assertTrue(msg, Arrays.areEqual(expected, actual));
+        assertTrue(msg, Arrays.equals(expected, actual));
         break;
      }
      case DOUBLE:
@@ -280,6 +278,10 @@
     new RowSetComparison(expected).verifyAndClearAll(actual);
   }
 
+  public static void verify(RowSet expected, RowSet actual, int rowCount) {
+    new RowSetComparison(expected).span(rowCount).verifyAndClearAll(actual);
+  }
+
   public static BigDecimal dec(String value) {
     return new BigDecimal(value);
   }