PIG-5271: StackOverflowError when compiling in Tez mode (with union and replicated join) (knoguchi)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1809051 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 62bc70c..1705b46 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,6 +48,8 @@
BUG FIXES
+PIG-5271: StackOverflowError when compiling in Tez mode (with union and replicated join) (knoguchi)
+
PIG-5299: PartitionFilterOptimizer failing at compile time (knoguchi)
PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
index a367f85..e66fb0a 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
@@ -54,15 +54,6 @@
this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs;
}
- private void addAllPredecessors(TezOperator tezOp, List<TezOperator> predsList) {
- if (getPlan().getPredecessors(tezOp) != null) {
- for (TezOperator pred : getPlan().getPredecessors(tezOp)) {
- predsList.add(pred);
- addAllPredecessors(pred, predsList);
- }
- }
- }
-
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
try {
@@ -88,7 +79,7 @@
}
for (TezOperator successor : successors) {
- List<TezOperator> predecessors = new ArrayList<TezOperator>(getPlan().getPredecessors(successor));
+ HashSet<TezOperator> predecessors = new HashSet<TezOperator>(getPlan().getPredecessors(successor));
predecessors.remove(tezOp);
if (!predecessors.isEmpty()) {
// If has other dependency that conflicts with other splittees, don't merge into split
@@ -103,16 +94,16 @@
for (TezOperator predecessor : getPlan().getPredecessors(successor)) {
if (predecessor != tezOp) {
predecessors.add(predecessor);
- addAllPredecessors(predecessor, predecessors);
+ TezCompilerUtil.addAllPredecessors(getPlan(), predecessor, predecessors);
}
}
- List<TezOperator> toMergeSuccPredecessors = new ArrayList<TezOperator>(successors);
+ Set<TezOperator> toMergeSuccPredecessors = new HashSet<TezOperator>(successors);
toMergeSuccPredecessors.remove(successor);
for (TezOperator splittee : splittees) {
for (TezOperator spliteePred : getPlan().getPredecessors(splittee)) {
if (spliteePred != tezOp) {
toMergeSuccPredecessors.add(spliteePred);
- addAllPredecessors(spliteePred, toMergeSuccPredecessors);
+ TezCompilerUtil.addAllPredecessors(getPlan(), spliteePred, toMergeSuccPredecessors);
}
}
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
index f68aeb8..55cfb5a 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
@@ -224,6 +224,19 @@
PhysicalPlan splitPredPlan = splitPredOp.plan;
if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It has to be. But check anyways
+ for( TezOperator op : predecessors ) {
+ if( !op.getOperatorKey().equals(splitPredKey)) {
+ Set<TezOperator> allNonMemberPredecessorsAncestors = new HashSet<TezOperator>();
+ TezCompilerUtil.addAllPredecessors(tezPlan, op, allNonMemberPredecessorsAncestors);
+ // If any of the nonMemberPredecessor's ancestors(recursive predecessor)
+ // is from the single unionmember, then we stop the merge effort to avoid creating
+ // an illegal loop.
+ if( allNonMemberPredecessorsAncestors.contains(splitPredOp) ) {
+ return;
+ }
+ }
+ }
+
try {
connectUnionNonMemberPredecessorsToSplit(unionOp, splitPredOp, predecessors);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
index 16b1b6b..99d8857 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
@@ -21,6 +21,7 @@
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.pig.PigException;
@@ -300,4 +301,15 @@
return false;
}
+ public static void addAllPredecessors(TezOperPlan tezPlan, TezOperator tezOp, Set<TezOperator> predSet) {
+ if (tezPlan.getPredecessors(tezOp) != null) {
+ for (TezOperator pred : tezPlan.getPredecessors(tezOp)) {
+ if( ! predSet.contains(pred) ) {
+ predSet.add(pred);
+ addAllPredecessors(tezPlan, pred, predSet);
+ }
+ }
+ }
+ }
+
}
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index 21f03e8..f99d6f3 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -1358,6 +1358,29 @@
run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld");
}
+ @Test
+ public void testJoinUnionSingleMemberOverlappingPredecessor() throws Exception {
+ String query =
+ "A = load 'file:///tmp/input1.txt' as (a1:int, a2:int);" +
+ "A1 = FILTER A by a1 > 10;" +
+ "A2 = FILTER A by a2 > 10;" +
+ "B = UNION A1, A2;" +
+ "C = join A1 by a1, A2 by a1;" +
+ "D = DISTINCT C;" +
+ "Z = join B by a1, D by A1::a1 using 'replicated'; " +
+ "store Z into 'file:///tmp/pigoutput';";
+ /*
+ [A,A1,A2] -> [C], [B,Z]
+ [C] -> [D]
+ [D] -> [B,Z]
+
+ with bug PIG-5271, UnionOptimizor tries to combine [A,A1,A2] and [B,Z], and creates an incorrect loop.
+ [A,A1,A2,B,Z] -> [C] -> [D] -> [A,A1,A2,B,Z]
+ */
+
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-22.gld");
+ }
+
private String getProperty(String property) {
return pigServer.getPigContext().getProperties().getProperty(property);
}