DRILL-7487: Removes the unused OUT_OF_MEMORY iterator status

See JIRA ticket for full explanation.

closes #1930
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 414d583..2107094 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -36,6 +36,7 @@
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
@@ -201,6 +202,15 @@
    */
   MetastoreRegistry getMetastoreRegistry();
 
+  /**
+   * An operator is experiencing memory pressure. Asks the fragment
+   * executor to poll all operators to release an optional memory
+   * (such as by spilling.) The request is advisory. The caller should
+   * again try to allocate memory, and if the second request fails,
+   * throw an <code>OutOfMemoryException</code>.
+   */
+  void requestMemory(RecordBatch requestor);
+
   interface ExecutorState {
     /**
      * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index cb35318..653241c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -54,6 +54,7 @@
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.UserClientConnection;
@@ -74,36 +75,46 @@
 import org.apache.drill.metastore.MetastoreRegistry;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * <p>
- *   This is the core Context which implements all the Context interfaces:
+ * This is the core Context which implements all the Context interfaces:
  *
- *   <ul>
- *     <li>{@link FragmentContext}: A context provided to non-exchange operators.</li>
- *     <li>{@link ExchangeFragmentContext}: A context provided to exchange operators.</li>
- *     <li>{@link RootFragmentContext}: A context provided to fragment roots.</li>
- *     <li>{@link ExecutorFragmentContext}: A context used by the Drillbit.</li>
- *   </ul>
+ * <ul>
+ * <li>{@link FragmentContext}: A context provided to non-exchange
+ * operators.</li>
+ * <li>{@link ExchangeFragmentContext}: A context provided to exchange
+ * operators.</li>
+ * <li>{@link RootFragmentContext}: A context provided to fragment roots.</li>
+ * <li>{@link ExecutorFragmentContext}: A context used by the Drillbit.</li>
+ * </ul>
  *
- *   The interfaces above expose resources to varying degrees. They are ordered from most restrictive ({@link FragmentContext})
- *   to least restrictive ({@link ExecutorFragmentContext}).
+ * The interfaces above expose resources to varying degrees. They are ordered
+ * from most restrictive ({@link FragmentContext}) to least restrictive
+ * ({@link ExecutorFragmentContext}).
  * </p>
  * <p>
- *   Since {@link FragmentContextImpl} implements all of the interfaces listed above, the facade pattern is used in order
- *   to cast a {@link FragmentContextImpl} object to the desired interface where-ever it is needed. The facade pattern
- *   is powerful since it allows us to easily create minimal context objects to be used in unit tests. Without
- *   the use of interfaces and the facade pattern we would have to create a complete {@link FragmentContextImpl} object
- *   to unit test any part of the code that depends on a context.
+ * Since {@link FragmentContextImpl} implements all of the interfaces listed
+ * above, the facade pattern is used in order to cast a
+ * {@link FragmentContextImpl} object to the desired interface where-ever it is
+ * needed. The facade pattern is powerful since it allows us to easily create
+ * minimal context objects to be used in unit tests. Without the use of
+ * interfaces and the facade pattern we would have to create a complete
+ * {@link FragmentContextImpl} object to unit test any part of the code that
+ * depends on a context.
  * </p>
  * <p>
- *  <b>General guideline:</b> Use the most narrow interface for the task. For example, "internal" operators don't need visibility to the networking functionality.
- *  Using the narrow interface allows unit testing without using mocking libraries. Often, the surrounding structure already has exposed the most narrow interface. If there are
- *  opportunities to clean up older code, we can do so as needed to make testing easier.
+ * <b>General guideline:</b> Use the most narrow interface for the task. For
+ * example, "internal" operators don't need visibility to the networking
+ * functionality. Using the narrow interface allows unit testing without using
+ * mocking libraries. Often, the surrounding structure already has exposed the
+ * most narrow interface. If there are opportunities to clean up older code, we
+ * can do so as needed to make testing easier.
  * </p>
  */
 public class FragmentContextImpl extends BaseFragmentContext implements ExecutorFragmentContext {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContextImpl.class);
+  private static final Logger logger = LoggerFactory.getLogger(FragmentContextImpl.class);
 
   private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = new HashMap<>();
   private final List<OperatorContextImpl> contexts = new LinkedList<>();
@@ -120,8 +131,8 @@
   private final BufferManager bufferManager;
   private ExecutorState executorState;
   private final ExecutionControls executionControls;
-  private boolean enableRuntimeFilter;
-  private boolean enableRFWaiting;
+  private final boolean enableRuntimeFilter;
+  private final boolean enableRFWaiting;
   private Lock lock4RF;
   private Condition condition4RF;
 
@@ -145,8 +156,8 @@
   private final AccountingUserConnection accountingUserConnection;
   /** Stores constants and their holders by type */
   private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
-  private Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new ConcurrentHashMap<>();
-  private Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
+  private final Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new ConcurrentHashMap<>();
+  private final Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
 
   /**
    * Create a FragmentContext instance for non-root fragment.
@@ -625,4 +636,15 @@
   public MetastoreRegistry getMetastoreRegistry() {
     return context.getMetastoreRegistry();
   }
+
+  @Override
+  public void requestMemory(RecordBatch requestor) {
+    // Does not actually do anything yet. Should ask the fragment
+    // executor to talk the tree, except for the given batch,
+    // and request the operator free up memory. Then, if the
+    // requestor is above its allocator limit, increase that
+    // limit. Since this operation is purely optional, we leave
+    // it as a stub for now. (No operator ever actually used the
+    // old OUT_OF_MEMORY iterator status that this method replaces.)
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 2ea8d08..b778937 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -83,8 +83,6 @@
       IterOutcome outcome = next(incoming);
       logger.trace("Screen Outcome {}", outcome);
       switch (outcome) {
-      case OUT_OF_MEMORY:
-        throw new OutOfMemoryException();
       case STOP:
         return false;
       case NONE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 55e7cd5..c23ac44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -49,10 +49,10 @@
 
     private final FragmentHandle oppositeHandle;
 
-    private RecordBatch incoming;
+    private final RecordBatch incoming;
     private AccountingDataTunnel tunnel;
-    private FragmentHandle handle;
-    private int recMajor;
+    private final FragmentHandle handle;
+    private final int recMajor;
     private volatile boolean ok = true;
     private volatile boolean done = false;
 
@@ -95,10 +95,7 @@
         incoming.kill(true);
         out = IterOutcome.NONE;
       }
-//      logger.debug("Outcome of sender next {}", out);
       switch (out) {
-      case OUT_OF_MEMORY:
-        throw new OutOfMemoryException();
       case STOP:
       case NONE:
         // if we didn't do anything yet, send an empty schema.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
index 6cad071..93aadc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
@@ -96,7 +96,6 @@
         upstream = next(incoming);
 
         switch(upstream) {
-          case OUT_OF_MEMORY:
           case STOP:
             return upstream;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index baef314..555b78d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -94,7 +94,7 @@
   private BatchSchema schema;
   private boolean schemaChanged = false;
   private PriorityQueue priorityQueue;
-  private TopN config;
+  private final TopN config;
   private SelectionVector4 sv4;
   private long countSincePurge;
   private int batchCount;
@@ -156,9 +156,6 @@
       case STOP:
         state = BatchState.STOP;
         return;
-      case OUT_OF_MEMORY:
-        state = BatchState.OUT_OF_MEMORY;
-        return;
       case NONE:
         state = BatchState.DONE;
         return;
@@ -221,7 +218,6 @@
           break outer;
         case NOT_YET:
           throw new UnsupportedOperationException();
-        case OUT_OF_MEMORY:
         case STOP:
           return lastKnownOutcome;
         case OK_NEW_SCHEMA:
@@ -663,7 +659,7 @@
   }
 
   public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
-    private SelectionVector4 sv4;
+    private final SelectionVector4 sv4;
 
     public SimpleSV4RecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
       super(container, context);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 4b63946..60a6cf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -90,7 +90,6 @@
         upstream = next(incoming);
 
         switch(upstream) {
-          case OUT_OF_MEMORY:
           case STOP:
             return upstream;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 38fb14e..615c8ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -99,7 +99,7 @@
   private boolean firstBatch = true;
 
   // This map saves the mapping between outgoing column and incoming column.
-  private Map<String, String> columnMapping;
+  private final Map<String, String> columnMapping;
   private final HashAggMemoryManager hashAggMemoryManager;
 
   private final GeneratorMapping UPDATE_AGGR_INSIDE =
@@ -252,9 +252,6 @@
         state = BatchState.DONE;
         container.buildSchema(SelectionVectorMode.NONE);
         return;
-      case OUT_OF_MEMORY:
-        state = BatchState.OUT_OF_MEMORY;
-        return;
       case STOP:
         state = BatchState.STOP;
         return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 7fcfd99..1c2cd85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -649,7 +649,6 @@
       }
       // Handle various results from getting the next batch
       switch (outcome) {
-        case OUT_OF_MEMORY:
         case NOT_YET:
           return AggOutcome.RETURN_OUTCOME;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 586fa32..9e9a097 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -172,9 +172,6 @@
         state = BatchState.DONE;
         container.buildSchema(SelectionVectorMode.NONE);
         return;
-      case OUT_OF_MEMORY:
-        state = BatchState.OUT_OF_MEMORY;
-        return;
       case STOP:
         state = BatchState.STOP;
         return;
@@ -242,7 +239,6 @@
             return IterOutcome.OK;
           }
           // else fall thru
-        case OUT_OF_MEMORY:
         case NOT_YET:
         case STOP:
           return lastKnownOutcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 173ed6a..1ada0ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -136,9 +136,6 @@
                     currentIndex = this.getVectorIndex(underlyingIndex);
                     break outer;
                   }
-                case OUT_OF_MEMORY:
-                  outcome = out;
-                  return AggOutcome.RETURN_OUTCOME;
                 case EMIT:
                   outerOutcome = EMIT;
                   if (incoming.getRecordCount() == 0) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 4207ef4..be60982 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -95,8 +95,6 @@
     RecordBatch.IterOutcome out = next(incoming);
     logger.debug("Outcome of sender next {}", out);
     switch(out){
-      case OUT_OF_MEMORY:
-        throw new OutOfMemoryException();
       case STOP:
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index cded844..18af718 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -453,10 +453,7 @@
 
     isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there is no data.
 
-    if (outcome == IterOutcome.OUT_OF_MEMORY) {
-      // We reached a termination state
-      state = BatchState.OUT_OF_MEMORY;
-    } else if (outcome == IterOutcome.STOP) {
+    if (outcome == IterOutcome.STOP) {
       // We reached a termination state
       state = BatchState.STOP;
     } else {
@@ -563,11 +560,6 @@
             state = BatchState.STOP;
             return leftUpstream;
           }
-
-          if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-            state = BatchState.OUT_OF_MEMORY;
-            return leftUpstream;
-          }
         }
 
         // Update the hash table related stats for the operator
@@ -1009,7 +1001,6 @@
     boolean moreData = true;
     while (moreData) {
       switch (rightUpstream) {
-      case OUT_OF_MEMORY:
       case NONE:
       case NOT_YET:
       case STOP:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index d53fb36..00c9363 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -53,7 +53,6 @@
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 /**
@@ -413,7 +412,7 @@
   }
 
   private boolean isTerminalOutcome(IterOutcome outcome) {
-    return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
+    return (outcome == STOP || outcome == NONE);
   }
 
   /**
@@ -477,7 +476,6 @@
             leftJoinIndex = 0;
           }
           break;
-        case OUT_OF_MEMORY:
         case NONE:
         case STOP:
           // Not using =0 since if outgoing container is empty then no point returning anything
@@ -547,7 +545,6 @@
           rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
           needNewRightBatch = false;
           break;
-        case OUT_OF_MEMORY:
         case NONE:
         case STOP:
           needNewRightBatch = false;
@@ -925,9 +922,6 @@
       case EMIT:
         state = BatchState.STOP;
         return false;
-      case OUT_OF_MEMORY:
-        state = BatchState.OUT_OF_MEMORY;
-        return false;
       case NONE:
       case NOT_YET:
         state = BatchState.DONE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index c84f954..2dd5973 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -177,8 +177,6 @@
               batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
             addBatchToHyperContainer(right);
             break;
-          case OUT_OF_MEMORY:
-            return IterOutcome.OUT_OF_MEMORY;
           case NONE:
           case STOP:
             //TODO we got a STOP, shouldn't we stop immediately ?
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index 2910da5..e580dfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -52,7 +52,7 @@
   private IterOutcome rightUpstream = IterOutcome.NONE;
   private final List<TransferPair> transfers = Lists.newArrayList();
   private int recordCount = 0;
-  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private RowKeyJoinState rkJoinState = RowKeyJoinState.INITIAL;
 
   public RowKeyJoinBatch(RowKeyJoinPOP config, FragmentContext context, RecordBatch left, RecordBatch right)
@@ -100,11 +100,6 @@
 
     leftUpstream = next(left);
 
-    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return;
-    }
-
     for (final VectorWrapper<?> v : left) {
       final TransferPair pair = v.getValueVector().makeTransferPair(
           container.addOrGet(v.getField(), callBack));
@@ -143,7 +138,6 @@
 
       switch(rightUpstream) {
       case NONE:
-      case OUT_OF_MEMORY:
       case STOP:
         rkJoinState = RowKeyJoinState.DONE;
         state = BatchState.DONE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 25c79ac..0bbaba1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -64,9 +64,6 @@
       incoming.kill(true);
 
       IterOutcome upStream = next(incoming);
-      if (upStream == IterOutcome.OUT_OF_MEMORY) {
-        return upStream;
-      }
 
       while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
         // Clear the memory for the incoming batch
@@ -78,9 +75,6 @@
           incomingSv.clear();
         }
         upStream = next(incoming);
-        if (upStream == IterOutcome.OUT_OF_MEMORY) {
-          return upStream;
-        }
       }
       // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
       if (upStream == EMIT) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 73a55b0..15b103d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -159,7 +159,6 @@
           // all incoming data was processed when returned OK_NEW_SCHEMA
           finishedLeft = !firstLeft;
           break outer;
-        case OUT_OF_MEMORY:
         case NOT_YET:
         case STOP:
           return outcome;
@@ -210,7 +209,6 @@
           // all incoming data was processed
           finishedRight = true;
           break outer;
-        case OUT_OF_MEMORY:
         case NOT_YET:
         case STOP:
           return outcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 5c45150..c302ef2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -124,7 +124,6 @@
         assert !firstBatch : "First batch should be OK_NEW_SCHEMA";
         doWorkInternal();
         // fall thru
-      case OUT_OF_MEMORY:
       case NOT_YET:
       case STOP:
         return outcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 5aed88e..6d495a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -61,11 +61,11 @@
 
 public class PartitionSenderRootExec extends BaseRootExec {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
-  private RecordBatch incoming;
-  private HashPartitionSender operator;
+  private final RecordBatch incoming;
+  private final HashPartitionSender operator;
   private PartitionerDecorator partitioner;
 
-  private ExchangeFragmentContext context;
+  private final ExchangeFragmentContext context;
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final double cost;
@@ -74,14 +74,14 @@
   private final AtomicInteger remaingReceiverCount;
   private boolean done = false;
   private boolean first = true;
-  private boolean closeIncoming;
+  private final boolean closeIncoming;
 
   long minReceiverRecordCount = Long.MAX_VALUE;
   long maxReceiverRecordCount = Long.MIN_VALUE;
   protected final int numberPartitions;
   protected final int actualPartitions;
 
-  private IntArrayList terminations = new IntArrayList();
+  private final IntArrayList terminations = new IntArrayList();
 
   public enum Metric implements MetricDef {
     BATCHES_SENT,
@@ -175,9 +175,6 @@
         }
         return false;
 
-      case OUT_OF_MEMORY:
-        throw new OutOfMemoryException();
-
       case STOP:
         if (partitioner != null) {
           partitioner.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 9385400..633920e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -133,9 +133,6 @@
             case NONE:
               stop = true;
               break outer;
-            case OUT_OF_MEMORY:
-              queue.putFirst(RecordBatchDataWrapper.outOfMemory());
-              return;
             case STOP:
               queue.putFirst(RecordBatchDataWrapper.failed());
               return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 185b8f0..97afac0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -173,10 +173,7 @@
 
           next = next(incoming);
           setLastKnownOutcome(next);
-          if (next == IterOutcome.OUT_OF_MEMORY) {
-            outOfMemory = true;
-            return next;
-          } else if (next == IterOutcome.NONE) {
+          if (next == IterOutcome.NONE) {
             // since this is first batch and we already got a NONE, need to set up the schema
             doAlloc(0);
             setValueCount(0);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index fcbb10e..a06f1d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -105,7 +105,6 @@
           break outer;
         case NOT_YET:
           throw new UnsupportedOperationException();
-        case OUT_OF_MEMORY:
         case STOP:
           return upstream;
         case OK_NEW_SCHEMA:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 921c92b..104917e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -346,7 +346,6 @@
         switch (outcome) {
           case NONE:
             break outer;
-          case OUT_OF_MEMORY:
           case NOT_YET:
           case STOP:
             return outcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index ed2b66e..35f34b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -129,7 +129,6 @@
 
         switch (upstream) {
         case NONE:
-        case OUT_OF_MEMORY:
         case STOP:
           return upstream;
         case OK_NEW_SCHEMA:
@@ -391,7 +390,6 @@
               batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
               getRecordBatchStatsContext());
             return Pair.of(outcome, topStatus);
-          case OUT_OF_MEMORY:
           case STOP:
             batchStatusStack.pop();
             return Pair.of(outcome, topStatus);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index f5fb77a..8b3c2e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -195,8 +195,10 @@
       }
 
       if (context.getAllocator().isOverLimit()) {
-        lastOutcome = IterOutcome.OUT_OF_MEMORY;
-        return lastOutcome;
+        context.requestMemory(this);
+        if (context.getAllocator().isOverLimit()) {
+          throw new OutOfMemoryException("Allocator over limit");
+        }
       }
 
       RecordBatchDef rbd = batch.getHeader().getDef();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 72a337a..3562e9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -112,7 +112,6 @@
     // Process according to upstream outcome
     switch (upStream) {
       case NONE:
-      case OUT_OF_MEMORY:
       case NOT_YET:
       case STOP:
         return upStream;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 3e7ae23..7e795dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -278,10 +278,9 @@
           validationState = ValidationState.TERMINAL;
           break;
         case NOT_YET:
-        case OUT_OF_MEMORY:
-          // NOT_YET and OUT_OF_MEMORY are allowed at any time, except if
+          // NOT_YET is allowed at any time, except if
           // terminated (checked above).
-          // NOT_YET and OUT_OF_MEMORY OK don't change high-level state.
+          // NOT_YET doesn't change high-level state.
           break;
         default:
           throw new AssertionError(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 07a5c76..29c4dad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -116,7 +116,6 @@
         case NONE:
           noMoreBatches = true;
           break;
-        case OUT_OF_MEMORY:
         case NOT_YET:
         case STOP:
           cleanup();
@@ -237,9 +236,6 @@
     case STOP:
       state = BatchState.STOP;
       return;
-    case OUT_OF_MEMORY:
-      state = BatchState.OUT_OF_MEMORY;
-      return;
     default:
       break;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index c8fa175..38a8e78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -119,7 +119,7 @@
   private boolean first = true;
   private int targetRecordCount;
   private final String fileName;
-  private Set<Path> currSpillDirs = Sets.newTreeSet();
+  private final Set<Path> currSpillDirs = Sets.newTreeSet();
   private int firstSpillBatchCount = 0;
   private int peakNumBatches = -1;
 
@@ -279,9 +279,6 @@
       case STOP:
         state = BatchState.STOP;
         break;
-      case OUT_OF_MEMORY:
-        state = BatchState.OUT_OF_MEMORY;
-        break;
       case NONE:
         state = BatchState.DONE;
         break;
@@ -436,19 +433,6 @@
             }
           }
           break;
-        case OUT_OF_MEMORY:
-          logger.debug("received OUT_OF_MEMORY, trying to spill");
-          if (batchesSinceLastSpill > 2) {
-            final BatchGroup merged = mergeAndSpill(batchGroups);
-            if (merged != null) {
-              spilledBatchGroups.add(merged);
-              batchesSinceLastSpill = 0;
-            }
-          } else {
-            logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
-            return IterOutcome.OUT_OF_MEMORY;
-          }
-          break;
         default:
           throw new UnsupportedOperationException();
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 32c7119..25c3eda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -288,9 +288,6 @@
       case STOP:
         state = BatchState.STOP;
         break;
-      case OUT_OF_MEMORY:
-        state = BatchState.OUT_OF_MEMORY;
-        break;
       case NONE:
         state = BatchState.DONE;
         break;
@@ -427,19 +424,6 @@
 
       sortImpl.addBatch(incoming);
       break;
-    case OUT_OF_MEMORY:
-
-      // Note: it is highly doubtful that this code actually works. It
-      // requires that the upstream batches got to a safe place to run
-      // out of memory and that no work was in-flight and thus abandoned.
-      // Consider removing this case once resource management is in place.
-
-      logger.error("received OUT_OF_MEMORY, trying to spill");
-      if (! sortImpl.forceSpill()) {
-        throw UserException.memoryError("Received OUT_OF_MEMORY, but not enough batches to spill")
-          .build(logger);
-      }
-      break;
     default:
       throw new IllegalStateException("Unexpected iter outcome: " + lastKnownOutcome);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 84afa3b..49cbb46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -79,11 +79,6 @@
       return false;
     }
 
-    if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return false;
-    }
-
     if (checkForEarlyFinish(leftOutcome, rightOutcome)) {
       state = BatchState.DONE;
       return false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 73f03f8..1548ec5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -20,7 +20,6 @@
 import java.util.Iterator;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -91,8 +90,6 @@
     NOT_FIRST,
     /** The query most likely failed, we need to propagate STOP to the root. */
     STOP,
-    /** Out of Memory while building the Schema...Ouch! */
-    OUT_OF_MEMORY,
     /** All work is done, no more data to be sent. */
     DONE
   }
@@ -165,11 +162,6 @@
             case DONE:
               lastOutcome = IterOutcome.NONE;
               break;
-            case OUT_OF_MEMORY:
-              // because we don't support schema changes, it is safe to fail the query right away
-              context.getExecutorState().fail(UserException.memoryError()
-                .build(logger));
-              // FALL-THROUGH
             case STOP:
               lastOutcome = IterOutcome.STOP;
               break;
@@ -253,7 +245,9 @@
 
   @Override
   public VectorContainer getOutgoingContainer() {
-    throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+    throw new UnsupportedOperationException(String.format(
+        "You should not call getOutgoingContainer() for class %s",
+        getClass().getCanonicalName()));
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index 808f6ed..78acb10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -38,7 +38,6 @@
 public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
   private static final Logger logger = LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
 
-  protected boolean outOfMemory;
   protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private IterOutcome lastKnownOutcome;
 
@@ -90,8 +89,6 @@
           container.buildSchema(SelectionVectorMode.NONE);
         }
         return upstream;
-      case OUT_OF_MEMORY:
-        return upstream;
       case OK_NEW_SCHEMA:
         if (state == BatchState.FIRST) {
           state = BatchState.NOT_FIRST;
@@ -122,11 +119,6 @@
           upstream = out;
         }
 
-        if (outOfMemory) {
-          outOfMemory = false;
-          return IterOutcome.OUT_OF_MEMORY;
-        }
-
         // Check if schema has changed
         if (callBack.getSchemaChangedAndReset()) {
           return IterOutcome.OK_NEW_SCHEMA;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 7473c8c..bdb8341 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -60,7 +60,7 @@
    *   </li>
    * </ul>
    * <p>
-   *  <strong>Details</strong>:
+   *  <h4>Details</h4>
    * </p>
    * <p>
    *   For normal completion, the basic sequence of return values from calls to
@@ -80,8 +80,8 @@
    *   </li>
    * </ol>
    * <p>
-   *   In addition to that basic sequence, {@link #NOT_YET} and
-   *   {@link #OUT_OF_MEMORY} values can appear anywhere in the subsequence
+   *   In addition to that basic sequence, {@link #NOT_YET}
+   *   values can appear anywhere in the subsequence
    *   before the terminal value ({@code NONE} or {@code STOP}).
    * </p>
    * <p>
@@ -91,16 +91,20 @@
    *   and that does not contain {@code NONE}, and ends with {@code STOP}.
    * </p>
    * <p>
-   *   (The normal-completion return sequence is matched by the following
+   *   The normal-completion return sequence is matched by the following
    *   regular-expression-style grammar:
    *   <pre>
-   *     ( ( NOT_YET | OUT_OF_MEMORY )*  OK_NEW_SCHEMA
-   *       ( NOT_YET | OUT_OF_MEMORY )*  OK )*
+   *     ( NOT_YET*  OK_NEW_SCHEMA
+   *       NOT_YET*  OK )*
    *     )+
-   *     ( NOT_YET | OUT_OF_+MEMORY )*  NONE
-   *   </pre>
-   *   )
+   *     NOT_YET*    NONE</pre>
    * </p>
+   * <h4>Obsolete Outcomes</h4>
+   *
+   * The former <tt>OUT_OF_MEMORY</tt> state was never really used.
+   * It is now handled by calling
+   * {@link FragmentContext#requestMemory()}
+   * at the point that the operator realizes it is short on memory.
    */
   enum IterOutcome {
     /**
@@ -195,20 +199,6 @@
     NOT_YET(false),
 
     /**
-     * Out of memory (not fatal).
-     * <p>
-     *   The call to {@link #next()},
-     *   including upstream operators, was unable to allocate memory
-     *   and did not read any records,
-     *   and the batch will have more results to return (at least completion or
-     *     abnormal termination ({@code NONE} or {@code STOP})).
-     *   The caller should release memory if it can (including by returning
-     *     {@code OUT_OF_MEMORY} to its caller) and call {@code next()} again.
-     * </p>
-     */
-    OUT_OF_MEMORY(true),
-
-    /**
      * Emit record to produce output batches.
      * <p>
      *   The call to {@link #next()},
@@ -233,7 +223,7 @@
      */
     EMIT(false);
 
-    private boolean error;
+    private final boolean error;
 
     IterOutcome(boolean error) {
       this.error = error;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index e03b17c..91c1b5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -240,8 +240,6 @@
             rbd.clear();
           }
           break;
-        case OUT_OF_MEMORY:
-          return lastOutcome;
         case NOT_YET:
         default:
           throw new UnsupportedOperationException("Unsupported outcome received " + lastOutcome);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 2d1a1ba..d8e3b2b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -256,7 +256,6 @@
         return currentOutcome;
       case NONE:
       case STOP:
-      case OUT_OF_MEMORY:
         isDone = true;
       case NOT_YET:
         container.setRecordCount(0);
@@ -328,7 +327,6 @@
 
     public Builder terminateWithError(IterOutcome errorOutcome) {
       Preconditions.checkArgument(errorOutcome != IterOutcome.STOP);
-      Preconditions.checkArgument(errorOutcome != IterOutcome.OUT_OF_MEMORY);
 
       iterOutcomes.add(errorOutcome);
       return this;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 3eba157..0db2d3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -55,7 +55,6 @@
 
 @Category(OperatorTest.class)
 public class TestLateralJoinCorrectness extends SubOperatorTest {
-  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestNestedNotYet.class);
 
   // Operator Context for mock batch
   private static OperatorContext operatorContext;
@@ -155,8 +154,7 @@
    * @return
    */
   private boolean isTerminal(RecordBatch.IterOutcome outcome) {
-    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP)
-      || (outcome == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
   }
 
   /**
@@ -1361,98 +1359,6 @@
     }
   }
 
-  @Test
-  public void testHandlingOOMFromLeft() throws Exception {
-    // Get the left container with dummy data for Lateral Join
-    leftContainer.add(nonEmptyLeftRowSet.container());
-
-    // Get the left IterOutcomes for Lateral Join
-    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    leftOutcomes.add(RecordBatch.IterOutcome.OUT_OF_MEMORY);
-
-    // Create Left MockRecordBatch
-    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
-      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
-
-    // Get the right container with dummy data
-    rightContainer.add(emptyRightRowSet.container());
-    rightContainer.add(nonEmptyRightRowSet.container());
-
-    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
-
-    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
-      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
-
-    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
-      leftMockBatch, rightMockBatch);
-
-    try {
-      int totalRecordCount = 0;
-      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
-
-      // 1st output batch
-      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
-      totalRecordCount += ljBatch.getRecordCount();
-
-      // 2nd output batch
-      assertTrue(RecordBatch.IterOutcome.OUT_OF_MEMORY == ljBatch.next());
-
-      // Compare the total records generated in 2 output batches with expected count.
-      assertTrue(totalRecordCount ==
-        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()));
-    } catch (AssertionError | Exception error) {
-      fail();
-    } finally {
-      // Close all the resources for this test case
-      ljBatch.close();
-      leftMockBatch.close();
-      rightMockBatch.close();
-    }
-  }
-
-  @Test
-  public void testHandlingOOMFromRight() throws Exception {
-    // Get the left container with dummy data for Lateral Join
-    leftContainer.add(nonEmptyLeftRowSet.container());
-
-    // Get the left IterOutcomes for Lateral Join
-    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    leftOutcomes.add(RecordBatch.IterOutcome.OK);
-
-    // Create Left MockRecordBatch
-    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
-      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
-
-    // Get the right container with dummy data
-    rightContainer.add(emptyRightRowSet.container());
-    rightContainer.add(nonEmptyRightRowSet.container());
-
-    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    rightOutcomes.add(RecordBatch.IterOutcome.OUT_OF_MEMORY);
-
-    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
-      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
-
-    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
-      leftMockBatch, rightMockBatch);
-
-    try {
-      // int totalRecordCount = 0;
-      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
-
-      // 2nd output batch
-      assertTrue(RecordBatch.IterOutcome.OUT_OF_MEMORY == ljBatch.next());
-    } catch (AssertionError | Exception error) {
-      fail();
-    } finally {
-      // Close all the resources for this test case
-      ljBatch.close();
-      leftMockBatch.close();
-      rightMockBatch.close();
-    }
-  }
-
   /**
    * Test to check basic left lateral join is working correctly or not. We create a left batch with one and
    * corresponding right batch with zero rows and check if output still get's populated with left side of data or not.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index aefa28a..adfb107 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -45,7 +45,7 @@
  */
 public class MockLateralJoinBatch implements LateralContract, CloseableRecordBatch {
 
-  private RecordBatch incoming;
+  private final RecordBatch incoming;
 
   private int recordIndex = 0;
   private RecordBatch unnest;
@@ -57,7 +57,7 @@
   private final FragmentContext context;
   private  final OperatorContext oContext;
 
-  private List<ValueVector> resultList = new ArrayList<>();
+  private final List<ValueVector> resultList = new ArrayList<>();
 
   public MockLateralJoinBatch(FragmentContext context, OperatorContext oContext, RecordBatch incoming) {
     this.context = context;
@@ -98,6 +98,7 @@
     return unnest;
   }
 
+  @Override
   public IterOutcome next() {
 
     IterOutcome currentOutcome = incoming.next();
@@ -139,7 +140,6 @@
         return currentOutcome;
       case NONE:
       case STOP:
-      case OUT_OF_MEMORY:
         isDone = true;
         return currentOutcome;
       case NOT_YET:
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index d69524c..bec4683 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -75,10 +75,10 @@
   public void testUnnestFixedWidthColumn() {
 
     Object[][] data = {
-        { (Object) new int[] {1, 2},
-          (Object) new int[] {3, 4, 5}},
-        { (Object) new int[] {6, 7, 8, 9},
-          (Object) new int[] {10, 11, 12, 13, 14}}
+        { new int[] {1, 2},
+          new int[] {3, 4, 5}},
+        { new int[] {6, 7, 8, 9},
+          new int[] {10, 11, 12, 13, 14}}
     };
 
     // Create input schema
@@ -107,10 +107,10 @@
   public void testUnnestVarWidthColumn() {
 
     Object[][] data = {
-        { (Object) new String[] {"", "zero"},
-          (Object) new String[] {"one", "two", "three"}},
-        { (Object) new String[] {"four", "five", "six", "seven"},
-          (Object) new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
+        { new String[] {"", "zero"},
+          new String[] {"one", "two", "three"}},
+        { new String[] {"four", "five", "six", "seven"},
+          new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
     };
 
     // Create input schema
@@ -162,11 +162,11 @@
   public void testUnnestEmptyList() {
 
     Object[][] data = {
-        { (Object) new String[] {},
-          (Object) new String[] {}
+        { new String[] {},
+          new String[] {}
         },
-        { (Object) new String[] {},
-          (Object) new String[] {}
+        { new String[] {},
+          new String[] {}
         }
     };
 
@@ -197,14 +197,14 @@
     // unnest column itself has changed
     Object[][] data = {
         {
-            (Object) new String[] {"0", "1"},
-            (Object) new String[] {"2", "3", "4"}
+            new String[] {"0", "1"},
+            new String[] {"2", "3", "4"}
         },
         {
-            (Object) new String[] {"5", "6" },
+            new String[] {"5", "6" },
         },
         {
-            (Object) new String[] {"9"}
+            new String[] {"9"}
         }
     };
 
@@ -240,14 +240,14 @@
   public void testUnnestSchemaChange() {
     Object[][] data = {
         {
-            (Object) new String[] {"0", "1"},
-            (Object) new String[] {"2", "3", "4"}
+            new String[] {"0", "1"},
+            new String[] {"2", "3", "4"}
         },
         {
-            (Object) new String[] {"5", "6" },
+            new String[] {"5", "6" },
         },
         {
-            (Object) new int[] {9}
+            new int[] {9}
         }
     };
 
@@ -463,10 +463,10 @@
   public void testUnnestNonArrayColumn() {
 
     Object[][] data = {
-        { (Object) new Integer (1),
-            (Object) new Integer (3)},
-        { (Object) new Integer (6),
-            (Object) new Integer (10)}
+        { new Integer (1),
+            new Integer (3)},
+        { new Integer (6),
+            new Integer (10)}
     };
 
     // Create input schema
@@ -584,7 +584,7 @@
         for (int j = 0; j < valueCount; j++) {
 
           if (vv instanceof MapVector) {
-            if (!compareMapBaseline((Object) baseline[i][j], vv.getAccessor().getObject(j))) {
+            if (!compareMapBaseline(baseline[i][j], vv.getAccessor().getObject(j))) {
               fail("Test failed in validating unnest(Map) output. Value mismatch");
             }
           } else if (vv instanceof VarCharVector) {
@@ -606,7 +606,7 @@
 
       }
 
-      assertTrue(((MockLateralJoinBatch) lateralJoinBatch).isCompleted());
+      assertTrue(lateralJoinBatch.isCompleted());
 
     } catch (UserException e) {
       throw e; // Valid exception
@@ -716,8 +716,7 @@
   }
 
   private boolean isTerminal(RecordBatch.IterOutcome outcome) {
-    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP) || (outcome
-        == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
   }
 
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 682b878..8969265 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -84,10 +84,10 @@
   public void testUnnestFixedWidthColumn() {
 
     Object[][] data = {
-        { (Object) new int[] {1, 2},
-          (Object) new int[] {3, 4, 5}},
-        { (Object) new int[] {6, 7, 8, 9},
-          (Object) new int[] {10, 11, 12, 13, 14}}
+        { new int[] {1, 2},
+          new int[] {3, 4, 5}},
+        { new int[] {6, 7, 8, 9},
+          new int[] {10, 11, 12, 13, 14}}
     };
 
     // Create input schema
@@ -119,10 +119,10 @@
   public void testUnnestVarWidthColumn() {
 
     Object[][] data = {
-        { (Object) new String[] {"", "zero"},
-          (Object) new String[] {"one", "two", "three"}},
-        { (Object) new String[] {"four", "five", "six", "seven"},
-          (Object) new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
+        { new String[] {"", "zero"},
+          new String[] {"one", "two", "three"}},
+        { new String[] {"four", "five", "six", "seven"},
+          new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
     };
 
     // Create input schema
@@ -174,11 +174,11 @@
   public void testUnnestEmptyList() {
 
     Object[][] data = {
-        { (Object) new String[] {},
-          (Object) new String[] {}
+        { new String[] {},
+          new String[] {}
         },
-        { (Object) new String[] {},
-          (Object) new String[] {}
+        { new String[] {},
+          new String[] {}
         }
     };
 
@@ -208,14 +208,14 @@
     // unnest column itself has changed
     Object[][] data = {
         {
-            (Object) new String[] {"0", "1"},
-            (Object) new String[] {"2", "3", "4"}
+            new String[] {"0", "1"},
+            new String[] {"2", "3", "4"}
         },
         {
-            (Object) new String[] {"5", "6" },
+            new String[] {"5", "6" },
         },
         {
-            (Object) new String[] {"9"}
+            new String[] {"9"}
         }
     };
 
@@ -253,14 +253,14 @@
   public void testUnnestSchemaChange() {
     Object[][] data = {
         {
-            (Object) new String[] {"0", "1"},
-            (Object) new String[] {"2", "3", "4"}
+            new String[] {"0", "1"},
+            new String[] {"2", "3", "4"}
         },
         {
-            (Object) new String[] {"5", "6" },
+            new String[] {"5", "6" },
         },
         {
-            (Object) new int[] {9}
+            new int[] {9}
         }
     };
 
@@ -502,10 +502,10 @@
   public void testUnnestNonArrayColumn() {
 
     Object[][] data = {
-        { (Object) new Integer (1),
-            (Object) new Integer (3)},
-        { (Object) new Integer (6),
-            (Object) new Integer (10)}
+        { new Integer (1),
+            new Integer (3)},
+        { new Integer (6),
+            new Integer (10)}
     };
 
     // Create input schema
@@ -668,7 +668,7 @@
               if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
                 fail("Test failed in validating unnest output. Value mismatch. Baseline value[" + vectorIndex + "][" + valueIndex
                     + "]" + ": "
-                    + ((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "   VV.getObject(valueIndex): " + val);
+                    + baseline[batchIndex][vectorIndex][valueIndex] + "   VV.getObject(valueIndex): " + val);
               }
             }
           }
@@ -824,8 +824,7 @@
   }
 
   private boolean isTerminal(RecordBatch.IterOutcome outcome) {
-    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP) || (outcome
-        == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+    return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
   }
 
 
@@ -985,7 +984,7 @@
               if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
                 fail("Test failed in validating unnest output. Value mismatch. Baseline value[" + vectorIndex + "][" + valueIndex
                     + "]" + ": "
-                    + ((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + "   VV.getObject(valueIndex): " + val);
+                    + baseline[batchIndex][vectorIndex][valueIndex] + "   VV.getObject(valueIndex): " + val);
               }
             }
           }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index ed4c521..1e2df25 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -54,6 +54,7 @@
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -171,8 +172,8 @@
     private final List<OperatorContext> contexts = Lists.newLinkedList();
 
 
-    private ExecutorState executorState = new OperatorFixture.MockExecutorState();
-    private ExecutionControls controls;
+    private final ExecutorState executorState = new OperatorFixture.MockExecutorState();
+    private final ExecutionControls controls;
 
     public MockFragmentContext(final DrillConfig config,
                                final OptionManager options,
@@ -339,6 +340,11 @@
     public MetastoreRegistry getMetastoreRegistry() {
       return null;
     }
+
+    @Override
+    public void requestMemory(RecordBatch requestor) {
+      // Does nothing in a mock fragment.
+    }
   }
 
   private final SystemOptionManager options;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index d02bdbf..07d8e03 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -129,7 +129,7 @@
 
   protected static class BatchIterator implements Iterable<VectorAccessible> {
 
-    private RecordBatch operator;
+    private final RecordBatch operator;
     public BatchIterator(RecordBatch operator) {
       this.operator = operator;
     }
@@ -148,8 +148,6 @@
           if (lastResultOutcome == RecordBatch.IterOutcome.NONE
             || lastResultOutcome == RecordBatch.IterOutcome.STOP) {
             return false;
-          } else if (lastResultOutcome == RecordBatch.IterOutcome.OUT_OF_MEMORY) {
-            throw new RuntimeException("Operator ran out of memory");
           } else {
             return true;
           }