DRILL-7487: Removes the unused OUT_OF_MEMORY iterator status
See JIRA ticket for full explanation.
closes #1930
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 414d583..2107094 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -36,6 +36,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
@@ -201,6 +202,15 @@
*/
MetastoreRegistry getMetastoreRegistry();
+ /**
+ * An operator is experiencing memory pressure. Asks the fragment
+ * executor to poll all operators to release an optional memory
+ * (such as by spilling.) The request is advisory. The caller should
+ * again try to allocate memory, and if the second request fails,
+ * throw an <code>OutOfMemoryException</code>.
+ */
+ void requestMemory(RecordBatch requestor);
+
interface ExecutorState {
/**
* Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index cb35318..653241c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -54,6 +54,7 @@
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.UserClientConnection;
@@ -74,36 +75,46 @@
import org.apache.drill.metastore.MetastoreRegistry;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * <p>
- * This is the core Context which implements all the Context interfaces:
+ * This is the core Context which implements all the Context interfaces:
*
- * <ul>
- * <li>{@link FragmentContext}: A context provided to non-exchange operators.</li>
- * <li>{@link ExchangeFragmentContext}: A context provided to exchange operators.</li>
- * <li>{@link RootFragmentContext}: A context provided to fragment roots.</li>
- * <li>{@link ExecutorFragmentContext}: A context used by the Drillbit.</li>
- * </ul>
+ * <ul>
+ * <li>{@link FragmentContext}: A context provided to non-exchange
+ * operators.</li>
+ * <li>{@link ExchangeFragmentContext}: A context provided to exchange
+ * operators.</li>
+ * <li>{@link RootFragmentContext}: A context provided to fragment roots.</li>
+ * <li>{@link ExecutorFragmentContext}: A context used by the Drillbit.</li>
+ * </ul>
*
- * The interfaces above expose resources to varying degrees. They are ordered from most restrictive ({@link FragmentContext})
- * to least restrictive ({@link ExecutorFragmentContext}).
+ * The interfaces above expose resources to varying degrees. They are ordered
+ * from most restrictive ({@link FragmentContext}) to least restrictive
+ * ({@link ExecutorFragmentContext}).
* </p>
* <p>
- * Since {@link FragmentContextImpl} implements all of the interfaces listed above, the facade pattern is used in order
- * to cast a {@link FragmentContextImpl} object to the desired interface where-ever it is needed. The facade pattern
- * is powerful since it allows us to easily create minimal context objects to be used in unit tests. Without
- * the use of interfaces and the facade pattern we would have to create a complete {@link FragmentContextImpl} object
- * to unit test any part of the code that depends on a context.
+ * Since {@link FragmentContextImpl} implements all of the interfaces listed
+ * above, the facade pattern is used in order to cast a
+ * {@link FragmentContextImpl} object to the desired interface where-ever it is
+ * needed. The facade pattern is powerful since it allows us to easily create
+ * minimal context objects to be used in unit tests. Without the use of
+ * interfaces and the facade pattern we would have to create a complete
+ * {@link FragmentContextImpl} object to unit test any part of the code that
+ * depends on a context.
* </p>
* <p>
- * <b>General guideline:</b> Use the most narrow interface for the task. For example, "internal" operators don't need visibility to the networking functionality.
- * Using the narrow interface allows unit testing without using mocking libraries. Often, the surrounding structure already has exposed the most narrow interface. If there are
- * opportunities to clean up older code, we can do so as needed to make testing easier.
+ * <b>General guideline:</b> Use the most narrow interface for the task. For
+ * example, "internal" operators don't need visibility to the networking
+ * functionality. Using the narrow interface allows unit testing without using
+ * mocking libraries. Often, the surrounding structure already has exposed the
+ * most narrow interface. If there are opportunities to clean up older code, we
+ * can do so as needed to make testing easier.
* </p>
*/
public class FragmentContextImpl extends BaseFragmentContext implements ExecutorFragmentContext {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContextImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(FragmentContextImpl.class);
private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = new HashMap<>();
private final List<OperatorContextImpl> contexts = new LinkedList<>();
@@ -120,8 +131,8 @@
private final BufferManager bufferManager;
private ExecutorState executorState;
private final ExecutionControls executionControls;
- private boolean enableRuntimeFilter;
- private boolean enableRFWaiting;
+ private final boolean enableRuntimeFilter;
+ private final boolean enableRFWaiting;
private Lock lock4RF;
private Condition condition4RF;
@@ -145,8 +156,8 @@
private final AccountingUserConnection accountingUserConnection;
/** Stores constants and their holders by type */
private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
- private Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new ConcurrentHashMap<>();
- private Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
+ private final Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new ConcurrentHashMap<>();
+ private final Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
/**
* Create a FragmentContext instance for non-root fragment.
@@ -625,4 +636,15 @@
public MetastoreRegistry getMetastoreRegistry() {
return context.getMetastoreRegistry();
}
+
+ @Override
+ public void requestMemory(RecordBatch requestor) {
+ // Does not actually do anything yet. Should ask the fragment
+ // executor to talk the tree, except for the given batch,
+ // and request the operator free up memory. Then, if the
+ // requestor is above its allocator limit, increase that
+ // limit. Since this operation is purely optional, we leave
+ // it as a stub for now. (No operator ever actually used the
+ // old OUT_OF_MEMORY iterator status that this method replaces.)
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 2ea8d08..b778937 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -83,8 +83,6 @@
IterOutcome outcome = next(incoming);
logger.trace("Screen Outcome {}", outcome);
switch (outcome) {
- case OUT_OF_MEMORY:
- throw new OutOfMemoryException();
case STOP:
return false;
case NONE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 55e7cd5..c23ac44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -49,10 +49,10 @@
private final FragmentHandle oppositeHandle;
- private RecordBatch incoming;
+ private final RecordBatch incoming;
private AccountingDataTunnel tunnel;
- private FragmentHandle handle;
- private int recMajor;
+ private final FragmentHandle handle;
+ private final int recMajor;
private volatile boolean ok = true;
private volatile boolean done = false;
@@ -95,10 +95,7 @@
incoming.kill(true);
out = IterOutcome.NONE;
}
-// logger.debug("Outcome of sender next {}", out);
switch (out) {
- case OUT_OF_MEMORY:
- throw new OutOfMemoryException();
case STOP:
case NONE:
// if we didn't do anything yet, send an empty schema.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
index 6cad071..93aadc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/StatisticsWriterRecordBatch.java
@@ -96,7 +96,6 @@
upstream = next(incoming);
switch(upstream) {
- case OUT_OF_MEMORY:
case STOP:
return upstream;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index baef314..555b78d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -94,7 +94,7 @@
private BatchSchema schema;
private boolean schemaChanged = false;
private PriorityQueue priorityQueue;
- private TopN config;
+ private final TopN config;
private SelectionVector4 sv4;
private long countSincePurge;
private int batchCount;
@@ -156,9 +156,6 @@
case STOP:
state = BatchState.STOP;
return;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- return;
case NONE:
state = BatchState.DONE;
return;
@@ -221,7 +218,6 @@
break outer;
case NOT_YET:
throw new UnsupportedOperationException();
- case OUT_OF_MEMORY:
case STOP:
return lastKnownOutcome;
case OK_NEW_SCHEMA:
@@ -663,7 +659,7 @@
}
public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
- private SelectionVector4 sv4;
+ private final SelectionVector4 sv4;
public SimpleSV4RecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
super(container, context);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 4b63946..60a6cf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -90,7 +90,6 @@
upstream = next(incoming);
switch(upstream) {
- case OUT_OF_MEMORY:
case STOP:
return upstream;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 38fb14e..615c8ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -99,7 +99,7 @@
private boolean firstBatch = true;
// This map saves the mapping between outgoing column and incoming column.
- private Map<String, String> columnMapping;
+ private final Map<String, String> columnMapping;
private final HashAggMemoryManager hashAggMemoryManager;
private final GeneratorMapping UPDATE_AGGR_INSIDE =
@@ -252,9 +252,6 @@
state = BatchState.DONE;
container.buildSchema(SelectionVectorMode.NONE);
return;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- return;
case STOP:
state = BatchState.STOP;
return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 7fcfd99..1c2cd85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -649,7 +649,6 @@
}
// Handle various results from getting the next batch
switch (outcome) {
- case OUT_OF_MEMORY:
case NOT_YET:
return AggOutcome.RETURN_OUTCOME;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 586fa32..9e9a097 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -172,9 +172,6 @@
state = BatchState.DONE;
container.buildSchema(SelectionVectorMode.NONE);
return;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- return;
case STOP:
state = BatchState.STOP;
return;
@@ -242,7 +239,6 @@
return IterOutcome.OK;
}
// else fall thru
- case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
return lastKnownOutcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 173ed6a..1ada0ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -136,9 +136,6 @@
currentIndex = this.getVectorIndex(underlyingIndex);
break outer;
}
- case OUT_OF_MEMORY:
- outcome = out;
- return AggOutcome.RETURN_OUTCOME;
case EMIT:
outerOutcome = EMIT;
if (incoming.getRecordCount() == 0) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 4207ef4..be60982 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -95,8 +95,6 @@
RecordBatch.IterOutcome out = next(incoming);
logger.debug("Outcome of sender next {}", out);
switch(out){
- case OUT_OF_MEMORY:
- throw new OutOfMemoryException();
case STOP:
case NONE:
for (int i = 0; i < tunnels.length; ++i) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index cded844..18af718 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -453,10 +453,7 @@
isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there is no data.
- if (outcome == IterOutcome.OUT_OF_MEMORY) {
- // We reached a termination state
- state = BatchState.OUT_OF_MEMORY;
- } else if (outcome == IterOutcome.STOP) {
+ if (outcome == IterOutcome.STOP) {
// We reached a termination state
state = BatchState.STOP;
} else {
@@ -563,11 +560,6 @@
state = BatchState.STOP;
return leftUpstream;
}
-
- if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
- state = BatchState.OUT_OF_MEMORY;
- return leftUpstream;
- }
}
// Update the hash table related stats for the operator
@@ -1009,7 +1001,6 @@
boolean moreData = true;
while (moreData) {
switch (rightUpstream) {
- case OUT_OF_MEMORY:
case NONE:
case NOT_YET:
case STOP:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index d53fb36..00c9363 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -53,7 +53,6 @@
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
/**
@@ -413,7 +412,7 @@
}
private boolean isTerminalOutcome(IterOutcome outcome) {
- return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
+ return (outcome == STOP || outcome == NONE);
}
/**
@@ -477,7 +476,6 @@
leftJoinIndex = 0;
}
break;
- case OUT_OF_MEMORY:
case NONE:
case STOP:
// Not using =0 since if outgoing container is empty then no point returning anything
@@ -547,7 +545,6 @@
rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
needNewRightBatch = false;
break;
- case OUT_OF_MEMORY:
case NONE:
case STOP:
needNewRightBatch = false;
@@ -925,9 +922,6 @@
case EMIT:
state = BatchState.STOP;
return false;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- return false;
case NONE:
case NOT_YET:
state = BatchState.DONE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index c84f954..2dd5973 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -177,8 +177,6 @@
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
addBatchToHyperContainer(right);
break;
- case OUT_OF_MEMORY:
- return IterOutcome.OUT_OF_MEMORY;
case NONE:
case STOP:
//TODO we got a STOP, shouldn't we stop immediately ?
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index 2910da5..e580dfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -52,7 +52,7 @@
private IterOutcome rightUpstream = IterOutcome.NONE;
private final List<TransferPair> transfers = Lists.newArrayList();
private int recordCount = 0;
- private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+ private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private RowKeyJoinState rkJoinState = RowKeyJoinState.INITIAL;
public RowKeyJoinBatch(RowKeyJoinPOP config, FragmentContext context, RecordBatch left, RecordBatch right)
@@ -100,11 +100,6 @@
leftUpstream = next(left);
- if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
- state = BatchState.OUT_OF_MEMORY;
- return;
- }
-
for (final VectorWrapper<?> v : left) {
final TransferPair pair = v.getValueVector().makeTransferPair(
container.addOrGet(v.getField(), callBack));
@@ -143,7 +138,6 @@
switch(rightUpstream) {
case NONE:
- case OUT_OF_MEMORY:
case STOP:
rkJoinState = RowKeyJoinState.DONE;
state = BatchState.DONE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 25c79ac..0bbaba1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -64,9 +64,6 @@
incoming.kill(true);
IterOutcome upStream = next(incoming);
- if (upStream == IterOutcome.OUT_OF_MEMORY) {
- return upStream;
- }
while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
// Clear the memory for the incoming batch
@@ -78,9 +75,6 @@
incomingSv.clear();
}
upStream = next(incoming);
- if (upStream == IterOutcome.OUT_OF_MEMORY) {
- return upStream;
- }
}
// If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
if (upStream == EMIT) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 73a55b0..15b103d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -159,7 +159,6 @@
// all incoming data was processed when returned OK_NEW_SCHEMA
finishedLeft = !firstLeft;
break outer;
- case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
return outcome;
@@ -210,7 +209,6 @@
// all incoming data was processed
finishedRight = true;
break outer;
- case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
return outcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 5c45150..c302ef2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -124,7 +124,6 @@
assert !firstBatch : "First batch should be OK_NEW_SCHEMA";
doWorkInternal();
// fall thru
- case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
return outcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 5aed88e..6d495a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -61,11 +61,11 @@
public class PartitionSenderRootExec extends BaseRootExec {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
- private RecordBatch incoming;
- private HashPartitionSender operator;
+ private final RecordBatch incoming;
+ private final HashPartitionSender operator;
private PartitionerDecorator partitioner;
- private ExchangeFragmentContext context;
+ private final ExchangeFragmentContext context;
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
private final double cost;
@@ -74,14 +74,14 @@
private final AtomicInteger remaingReceiverCount;
private boolean done = false;
private boolean first = true;
- private boolean closeIncoming;
+ private final boolean closeIncoming;
long minReceiverRecordCount = Long.MAX_VALUE;
long maxReceiverRecordCount = Long.MIN_VALUE;
protected final int numberPartitions;
protected final int actualPartitions;
- private IntArrayList terminations = new IntArrayList();
+ private final IntArrayList terminations = new IntArrayList();
public enum Metric implements MetricDef {
BATCHES_SENT,
@@ -175,9 +175,6 @@
}
return false;
- case OUT_OF_MEMORY:
- throw new OutOfMemoryException();
-
case STOP:
if (partitioner != null) {
partitioner.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 9385400..633920e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -133,9 +133,6 @@
case NONE:
stop = true;
break outer;
- case OUT_OF_MEMORY:
- queue.putFirst(RecordBatchDataWrapper.outOfMemory());
- return;
case STOP:
queue.putFirst(RecordBatchDataWrapper.failed());
return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 185b8f0..97afac0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -173,10 +173,7 @@
next = next(incoming);
setLastKnownOutcome(next);
- if (next == IterOutcome.OUT_OF_MEMORY) {
- outOfMemory = true;
- return next;
- } else if (next == IterOutcome.NONE) {
+ if (next == IterOutcome.NONE) {
// since this is first batch and we already got a NONE, need to set up the schema
doAlloc(0);
setValueCount(0);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index fcbb10e..a06f1d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -105,7 +105,6 @@
break outer;
case NOT_YET:
throw new UnsupportedOperationException();
- case OUT_OF_MEMORY:
case STOP:
return upstream;
case OK_NEW_SCHEMA:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 921c92b..104917e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -346,7 +346,6 @@
switch (outcome) {
case NONE:
break outer;
- case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
return outcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index ed2b66e..35f34b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -129,7 +129,6 @@
switch (upstream) {
case NONE:
- case OUT_OF_MEMORY:
case STOP:
return upstream;
case OK_NEW_SCHEMA:
@@ -391,7 +390,6 @@
batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
getRecordBatchStatsContext());
return Pair.of(outcome, topStatus);
- case OUT_OF_MEMORY:
case STOP:
batchStatusStack.pop();
return Pair.of(outcome, topStatus);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index f5fb77a..8b3c2e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -195,8 +195,10 @@
}
if (context.getAllocator().isOverLimit()) {
- lastOutcome = IterOutcome.OUT_OF_MEMORY;
- return lastOutcome;
+ context.requestMemory(this);
+ if (context.getAllocator().isOverLimit()) {
+ throw new OutOfMemoryException("Allocator over limit");
+ }
}
RecordBatchDef rbd = batch.getHeader().getDef();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 72a337a..3562e9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -112,7 +112,6 @@
// Process according to upstream outcome
switch (upStream) {
case NONE:
- case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
return upStream;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 3e7ae23..7e795dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -278,10 +278,9 @@
validationState = ValidationState.TERMINAL;
break;
case NOT_YET:
- case OUT_OF_MEMORY:
- // NOT_YET and OUT_OF_MEMORY are allowed at any time, except if
+ // NOT_YET is allowed at any time, except if
// terminated (checked above).
- // NOT_YET and OUT_OF_MEMORY OK don't change high-level state.
+ // NOT_YET doesn't change high-level state.
break;
default:
throw new AssertionError(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 07a5c76..29c4dad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -116,7 +116,6 @@
case NONE:
noMoreBatches = true;
break;
- case OUT_OF_MEMORY:
case NOT_YET:
case STOP:
cleanup();
@@ -237,9 +236,6 @@
case STOP:
state = BatchState.STOP;
return;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- return;
default:
break;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index c8fa175..38a8e78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -119,7 +119,7 @@
private boolean first = true;
private int targetRecordCount;
private final String fileName;
- private Set<Path> currSpillDirs = Sets.newTreeSet();
+ private final Set<Path> currSpillDirs = Sets.newTreeSet();
private int firstSpillBatchCount = 0;
private int peakNumBatches = -1;
@@ -279,9 +279,6 @@
case STOP:
state = BatchState.STOP;
break;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- break;
case NONE:
state = BatchState.DONE;
break;
@@ -436,19 +433,6 @@
}
}
break;
- case OUT_OF_MEMORY:
- logger.debug("received OUT_OF_MEMORY, trying to spill");
- if (batchesSinceLastSpill > 2) {
- final BatchGroup merged = mergeAndSpill(batchGroups);
- if (merged != null) {
- spilledBatchGroups.add(merged);
- batchesSinceLastSpill = 0;
- }
- } else {
- logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
- return IterOutcome.OUT_OF_MEMORY;
- }
- break;
default:
throw new UnsupportedOperationException();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 32c7119..25c3eda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -288,9 +288,6 @@
case STOP:
state = BatchState.STOP;
break;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- break;
case NONE:
state = BatchState.DONE;
break;
@@ -427,19 +424,6 @@
sortImpl.addBatch(incoming);
break;
- case OUT_OF_MEMORY:
-
- // Note: it is highly doubtful that this code actually works. It
- // requires that the upstream batches got to a safe place to run
- // out of memory and that no work was in-flight and thus abandoned.
- // Consider removing this case once resource management is in place.
-
- logger.error("received OUT_OF_MEMORY, trying to spill");
- if (! sortImpl.forceSpill()) {
- throw UserException.memoryError("Received OUT_OF_MEMORY, but not enough batches to spill")
- .build(logger);
- }
- break;
default:
throw new IllegalStateException("Unexpected iter outcome: " + lastKnownOutcome);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 84afa3b..49cbb46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -79,11 +79,6 @@
return false;
}
- if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == IterOutcome.OUT_OF_MEMORY) {
- state = BatchState.OUT_OF_MEMORY;
- return false;
- }
-
if (checkForEarlyFinish(leftOutcome, rightOutcome)) {
state = BatchState.DONE;
return false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 73f03f8..1548ec5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -20,7 +20,6 @@
import java.util.Iterator;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -91,8 +90,6 @@
NOT_FIRST,
/** The query most likely failed, we need to propagate STOP to the root. */
STOP,
- /** Out of Memory while building the Schema...Ouch! */
- OUT_OF_MEMORY,
/** All work is done, no more data to be sent. */
DONE
}
@@ -165,11 +162,6 @@
case DONE:
lastOutcome = IterOutcome.NONE;
break;
- case OUT_OF_MEMORY:
- // because we don't support schema changes, it is safe to fail the query right away
- context.getExecutorState().fail(UserException.memoryError()
- .build(logger));
- // FALL-THROUGH
case STOP:
lastOutcome = IterOutcome.STOP;
break;
@@ -253,7 +245,9 @@
@Override
public VectorContainer getOutgoingContainer() {
- throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ throw new UnsupportedOperationException(String.format(
+ "You should not call getOutgoingContainer() for class %s",
+ getClass().getCanonicalName()));
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index 808f6ed..78acb10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -38,7 +38,6 @@
public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
private static final Logger logger = LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
- protected boolean outOfMemory;
protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private IterOutcome lastKnownOutcome;
@@ -90,8 +89,6 @@
container.buildSchema(SelectionVectorMode.NONE);
}
return upstream;
- case OUT_OF_MEMORY:
- return upstream;
case OK_NEW_SCHEMA:
if (state == BatchState.FIRST) {
state = BatchState.NOT_FIRST;
@@ -122,11 +119,6 @@
upstream = out;
}
- if (outOfMemory) {
- outOfMemory = false;
- return IterOutcome.OUT_OF_MEMORY;
- }
-
// Check if schema has changed
if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 7473c8c..bdb8341 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -60,7 +60,7 @@
* </li>
* </ul>
* <p>
- * <strong>Details</strong>:
+ * <h4>Details</h4>
* </p>
* <p>
* For normal completion, the basic sequence of return values from calls to
@@ -80,8 +80,8 @@
* </li>
* </ol>
* <p>
- * In addition to that basic sequence, {@link #NOT_YET} and
- * {@link #OUT_OF_MEMORY} values can appear anywhere in the subsequence
+ * In addition to that basic sequence, {@link #NOT_YET}
+ * values can appear anywhere in the subsequence
* before the terminal value ({@code NONE} or {@code STOP}).
* </p>
* <p>
@@ -91,16 +91,20 @@
* and that does not contain {@code NONE}, and ends with {@code STOP}.
* </p>
* <p>
- * (The normal-completion return sequence is matched by the following
+ * The normal-completion return sequence is matched by the following
* regular-expression-style grammar:
* <pre>
- * ( ( NOT_YET | OUT_OF_MEMORY )* OK_NEW_SCHEMA
- * ( NOT_YET | OUT_OF_MEMORY )* OK )*
+ * ( NOT_YET* OK_NEW_SCHEMA
+ * NOT_YET* OK )*
* )+
- * ( NOT_YET | OUT_OF_+MEMORY )* NONE
- * </pre>
- * )
+ * NOT_YET* NONE</pre>
* </p>
+ * <h4>Obsolete Outcomes</h4>
+ *
+ * The former <tt>OUT_OF_MEMORY</tt> state was never really used.
+ * It is now handled by calling
+ * {@link FragmentContext#requestMemory()}
+ * at the point that the operator realizes it is short on memory.
*/
enum IterOutcome {
/**
@@ -195,20 +199,6 @@
NOT_YET(false),
/**
- * Out of memory (not fatal).
- * <p>
- * The call to {@link #next()},
- * including upstream operators, was unable to allocate memory
- * and did not read any records,
- * and the batch will have more results to return (at least completion or
- * abnormal termination ({@code NONE} or {@code STOP})).
- * The caller should release memory if it can (including by returning
- * {@code OUT_OF_MEMORY} to its caller) and call {@code next()} again.
- * </p>
- */
- OUT_OF_MEMORY(true),
-
- /**
* Emit record to produce output batches.
* <p>
* The call to {@link #next()},
@@ -233,7 +223,7 @@
*/
EMIT(false);
- private boolean error;
+ private final boolean error;
IterOutcome(boolean error) {
this.error = error;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index e03b17c..91c1b5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -240,8 +240,6 @@
rbd.clear();
}
break;
- case OUT_OF_MEMORY:
- return lastOutcome;
case NOT_YET:
default:
throw new UnsupportedOperationException("Unsupported outcome received " + lastOutcome);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 2d1a1ba..d8e3b2b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -256,7 +256,6 @@
return currentOutcome;
case NONE:
case STOP:
- case OUT_OF_MEMORY:
isDone = true;
case NOT_YET:
container.setRecordCount(0);
@@ -328,7 +327,6 @@
public Builder terminateWithError(IterOutcome errorOutcome) {
Preconditions.checkArgument(errorOutcome != IterOutcome.STOP);
- Preconditions.checkArgument(errorOutcome != IterOutcome.OUT_OF_MEMORY);
iterOutcomes.add(errorOutcome);
return this;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 3eba157..0db2d3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -55,7 +55,6 @@
@Category(OperatorTest.class)
public class TestLateralJoinCorrectness extends SubOperatorTest {
- //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestNestedNotYet.class);
// Operator Context for mock batch
private static OperatorContext operatorContext;
@@ -155,8 +154,7 @@
* @return
*/
private boolean isTerminal(RecordBatch.IterOutcome outcome) {
- return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP)
- || (outcome == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+ return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
}
/**
@@ -1361,98 +1359,6 @@
}
}
- @Test
- public void testHandlingOOMFromLeft() throws Exception {
- // Get the left container with dummy data for Lateral Join
- leftContainer.add(nonEmptyLeftRowSet.container());
-
- // Get the left IterOutcomes for Lateral Join
- leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
- leftOutcomes.add(RecordBatch.IterOutcome.OUT_OF_MEMORY);
-
- // Create Left MockRecordBatch
- final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
- leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
-
- // Get the right container with dummy data
- rightContainer.add(emptyRightRowSet.container());
- rightContainer.add(nonEmptyRightRowSet.container());
-
- rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
- rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
-
- final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
- rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
-
- final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
- leftMockBatch, rightMockBatch);
-
- try {
- int totalRecordCount = 0;
- assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
-
- // 1st output batch
- assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
- totalRecordCount += ljBatch.getRecordCount();
-
- // 2nd output batch
- assertTrue(RecordBatch.IterOutcome.OUT_OF_MEMORY == ljBatch.next());
-
- // Compare the total records generated in 2 output batches with expected count.
- assertTrue(totalRecordCount ==
- (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()));
- } catch (AssertionError | Exception error) {
- fail();
- } finally {
- // Close all the resources for this test case
- ljBatch.close();
- leftMockBatch.close();
- rightMockBatch.close();
- }
- }
-
- @Test
- public void testHandlingOOMFromRight() throws Exception {
- // Get the left container with dummy data for Lateral Join
- leftContainer.add(nonEmptyLeftRowSet.container());
-
- // Get the left IterOutcomes for Lateral Join
- leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
- leftOutcomes.add(RecordBatch.IterOutcome.OK);
-
- // Create Left MockRecordBatch
- final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
- leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
-
- // Get the right container with dummy data
- rightContainer.add(emptyRightRowSet.container());
- rightContainer.add(nonEmptyRightRowSet.container());
-
- rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
- rightOutcomes.add(RecordBatch.IterOutcome.OUT_OF_MEMORY);
-
- final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
- rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
-
- final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
- leftMockBatch, rightMockBatch);
-
- try {
- // int totalRecordCount = 0;
- assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
-
- // 2nd output batch
- assertTrue(RecordBatch.IterOutcome.OUT_OF_MEMORY == ljBatch.next());
- } catch (AssertionError | Exception error) {
- fail();
- } finally {
- // Close all the resources for this test case
- ljBatch.close();
- leftMockBatch.close();
- rightMockBatch.close();
- }
- }
-
/**
* Test to check basic left lateral join is working correctly or not. We create a left batch with one and
* corresponding right batch with zero rows and check if output still get's populated with left side of data or not.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index aefa28a..adfb107 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -45,7 +45,7 @@
*/
public class MockLateralJoinBatch implements LateralContract, CloseableRecordBatch {
- private RecordBatch incoming;
+ private final RecordBatch incoming;
private int recordIndex = 0;
private RecordBatch unnest;
@@ -57,7 +57,7 @@
private final FragmentContext context;
private final OperatorContext oContext;
- private List<ValueVector> resultList = new ArrayList<>();
+ private final List<ValueVector> resultList = new ArrayList<>();
public MockLateralJoinBatch(FragmentContext context, OperatorContext oContext, RecordBatch incoming) {
this.context = context;
@@ -98,6 +98,7 @@
return unnest;
}
+ @Override
public IterOutcome next() {
IterOutcome currentOutcome = incoming.next();
@@ -139,7 +140,6 @@
return currentOutcome;
case NONE:
case STOP:
- case OUT_OF_MEMORY:
isDone = true;
return currentOutcome;
case NOT_YET:
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index d69524c..bec4683 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -75,10 +75,10 @@
public void testUnnestFixedWidthColumn() {
Object[][] data = {
- { (Object) new int[] {1, 2},
- (Object) new int[] {3, 4, 5}},
- { (Object) new int[] {6, 7, 8, 9},
- (Object) new int[] {10, 11, 12, 13, 14}}
+ { new int[] {1, 2},
+ new int[] {3, 4, 5}},
+ { new int[] {6, 7, 8, 9},
+ new int[] {10, 11, 12, 13, 14}}
};
// Create input schema
@@ -107,10 +107,10 @@
public void testUnnestVarWidthColumn() {
Object[][] data = {
- { (Object) new String[] {"", "zero"},
- (Object) new String[] {"one", "two", "three"}},
- { (Object) new String[] {"four", "five", "six", "seven"},
- (Object) new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
+ { new String[] {"", "zero"},
+ new String[] {"one", "two", "three"}},
+ { new String[] {"four", "five", "six", "seven"},
+ new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
};
// Create input schema
@@ -162,11 +162,11 @@
public void testUnnestEmptyList() {
Object[][] data = {
- { (Object) new String[] {},
- (Object) new String[] {}
+ { new String[] {},
+ new String[] {}
},
- { (Object) new String[] {},
- (Object) new String[] {}
+ { new String[] {},
+ new String[] {}
}
};
@@ -197,14 +197,14 @@
// unnest column itself has changed
Object[][] data = {
{
- (Object) new String[] {"0", "1"},
- (Object) new String[] {"2", "3", "4"}
+ new String[] {"0", "1"},
+ new String[] {"2", "3", "4"}
},
{
- (Object) new String[] {"5", "6" },
+ new String[] {"5", "6" },
},
{
- (Object) new String[] {"9"}
+ new String[] {"9"}
}
};
@@ -240,14 +240,14 @@
public void testUnnestSchemaChange() {
Object[][] data = {
{
- (Object) new String[] {"0", "1"},
- (Object) new String[] {"2", "3", "4"}
+ new String[] {"0", "1"},
+ new String[] {"2", "3", "4"}
},
{
- (Object) new String[] {"5", "6" },
+ new String[] {"5", "6" },
},
{
- (Object) new int[] {9}
+ new int[] {9}
}
};
@@ -463,10 +463,10 @@
public void testUnnestNonArrayColumn() {
Object[][] data = {
- { (Object) new Integer (1),
- (Object) new Integer (3)},
- { (Object) new Integer (6),
- (Object) new Integer (10)}
+ { new Integer (1),
+ new Integer (3)},
+ { new Integer (6),
+ new Integer (10)}
};
// Create input schema
@@ -584,7 +584,7 @@
for (int j = 0; j < valueCount; j++) {
if (vv instanceof MapVector) {
- if (!compareMapBaseline((Object) baseline[i][j], vv.getAccessor().getObject(j))) {
+ if (!compareMapBaseline(baseline[i][j], vv.getAccessor().getObject(j))) {
fail("Test failed in validating unnest(Map) output. Value mismatch");
}
} else if (vv instanceof VarCharVector) {
@@ -606,7 +606,7 @@
}
- assertTrue(((MockLateralJoinBatch) lateralJoinBatch).isCompleted());
+ assertTrue(lateralJoinBatch.isCompleted());
} catch (UserException e) {
throw e; // Valid exception
@@ -716,8 +716,7 @@
}
private boolean isTerminal(RecordBatch.IterOutcome outcome) {
- return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP) || (outcome
- == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+ return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 682b878..8969265 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -84,10 +84,10 @@
public void testUnnestFixedWidthColumn() {
Object[][] data = {
- { (Object) new int[] {1, 2},
- (Object) new int[] {3, 4, 5}},
- { (Object) new int[] {6, 7, 8, 9},
- (Object) new int[] {10, 11, 12, 13, 14}}
+ { new int[] {1, 2},
+ new int[] {3, 4, 5}},
+ { new int[] {6, 7, 8, 9},
+ new int[] {10, 11, 12, 13, 14}}
};
// Create input schema
@@ -119,10 +119,10 @@
public void testUnnestVarWidthColumn() {
Object[][] data = {
- { (Object) new String[] {"", "zero"},
- (Object) new String[] {"one", "two", "three"}},
- { (Object) new String[] {"four", "five", "six", "seven"},
- (Object) new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
+ { new String[] {"", "zero"},
+ new String[] {"one", "two", "three"}},
+ { new String[] {"four", "five", "six", "seven"},
+ new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
};
// Create input schema
@@ -174,11 +174,11 @@
public void testUnnestEmptyList() {
Object[][] data = {
- { (Object) new String[] {},
- (Object) new String[] {}
+ { new String[] {},
+ new String[] {}
},
- { (Object) new String[] {},
- (Object) new String[] {}
+ { new String[] {},
+ new String[] {}
}
};
@@ -208,14 +208,14 @@
// unnest column itself has changed
Object[][] data = {
{
- (Object) new String[] {"0", "1"},
- (Object) new String[] {"2", "3", "4"}
+ new String[] {"0", "1"},
+ new String[] {"2", "3", "4"}
},
{
- (Object) new String[] {"5", "6" },
+ new String[] {"5", "6" },
},
{
- (Object) new String[] {"9"}
+ new String[] {"9"}
}
};
@@ -253,14 +253,14 @@
public void testUnnestSchemaChange() {
Object[][] data = {
{
- (Object) new String[] {"0", "1"},
- (Object) new String[] {"2", "3", "4"}
+ new String[] {"0", "1"},
+ new String[] {"2", "3", "4"}
},
{
- (Object) new String[] {"5", "6" },
+ new String[] {"5", "6" },
},
{
- (Object) new int[] {9}
+ new int[] {9}
}
};
@@ -502,10 +502,10 @@
public void testUnnestNonArrayColumn() {
Object[][] data = {
- { (Object) new Integer (1),
- (Object) new Integer (3)},
- { (Object) new Integer (6),
- (Object) new Integer (10)}
+ { new Integer (1),
+ new Integer (3)},
+ { new Integer (6),
+ new Integer (10)}
};
// Create input schema
@@ -668,7 +668,7 @@
if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
fail("Test failed in validating unnest output. Value mismatch. Baseline value[" + vectorIndex + "][" + valueIndex
+ "]" + ": "
- + ((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + " VV.getObject(valueIndex): " + val);
+ + baseline[batchIndex][vectorIndex][valueIndex] + " VV.getObject(valueIndex): " + val);
}
}
}
@@ -824,8 +824,7 @@
}
private boolean isTerminal(RecordBatch.IterOutcome outcome) {
- return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP) || (outcome
- == RecordBatch.IterOutcome.OUT_OF_MEMORY);
+ return (outcome == RecordBatch.IterOutcome.NONE || outcome == RecordBatch.IterOutcome.STOP);
}
@@ -985,7 +984,7 @@
if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
fail("Test failed in validating unnest output. Value mismatch. Baseline value[" + vectorIndex + "][" + valueIndex
+ "]" + ": "
- + ((Object[])baseline[batchIndex][vectorIndex])[valueIndex] + " VV.getObject(valueIndex): " + val);
+ + baseline[batchIndex][vectorIndex][valueIndex] + " VV.getObject(valueIndex): " + val);
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index ed4c521..1e2df25 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -54,6 +54,7 @@
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
@@ -171,8 +172,8 @@
private final List<OperatorContext> contexts = Lists.newLinkedList();
- private ExecutorState executorState = new OperatorFixture.MockExecutorState();
- private ExecutionControls controls;
+ private final ExecutorState executorState = new OperatorFixture.MockExecutorState();
+ private final ExecutionControls controls;
public MockFragmentContext(final DrillConfig config,
final OptionManager options,
@@ -339,6 +340,11 @@
public MetastoreRegistry getMetastoreRegistry() {
return null;
}
+
+ @Override
+ public void requestMemory(RecordBatch requestor) {
+ // Does nothing in a mock fragment.
+ }
}
private final SystemOptionManager options;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index d02bdbf..07d8e03 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -129,7 +129,7 @@
protected static class BatchIterator implements Iterable<VectorAccessible> {
- private RecordBatch operator;
+ private final RecordBatch operator;
public BatchIterator(RecordBatch operator) {
this.operator = operator;
}
@@ -148,8 +148,6 @@
if (lastResultOutcome == RecordBatch.IterOutcome.NONE
|| lastResultOutcome == RecordBatch.IterOutcome.STOP) {
return false;
- } else if (lastResultOutcome == RecordBatch.IterOutcome.OUT_OF_MEMORY) {
- throw new RuntimeException("Operator ran out of memory");
} else {
return true;
}