PIG-5033: MultiQueryOptimizerTez creates bad plan with union, split and FRJoin (rohini,tmwoordruff via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1767319 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 261b304..7e70913 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -52,6 +52,8 @@
BUG FIXES
+PIG-5033: MultiQueryOptimizerTez creates bad plan with union, split and FRJoin (rohini,tmwoordruff via rohini)
+
PIG-4934: SET command does not work well with deprecated settings (szita via daijy)
PIG-4798: big integer literals fail to parse (szita via daijy)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
index a084b96..a367f85 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
@@ -153,6 +153,8 @@
}
}
if (getPlan().getSuccessors(successor) != null) {
+ nonPackageInputSuccessors.clear();
+ toMergeSuccessors.clear();
for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
if (succSuccessor.isUnion()) {
if (!(unionOptimizerOn &&
@@ -171,7 +173,13 @@
continue;
}
}
- toMergeSuccessors.add(succSuccessor);
+ if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
+ // Output goes to scalar or POFRJoinTez in the union operator
+ // We need to ensure it is the only one to avoid parallel edges
+ canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
+ } else {
+ toMergeSuccessors.add(succSuccessor);
+ }
List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
if (unionSuccessors != null) {
for (TezOperator unionSuccessor : unionSuccessors) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
index 14c416c..16b1b6b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
@@ -36,6 +36,7 @@
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
@@ -198,8 +199,8 @@
public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException {
try {
- List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
- for (TezInput input : inputs) {
+ List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class);
+ for (POFRJoinTez input : inputs) {
if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
return true;
}
diff --git a/test/e2e/pig/tests/multiquery.conf b/test/e2e/pig/tests/multiquery.conf
index 934d983..0ac20ef 100644
--- a/test/e2e/pig/tests/multiquery.conf
+++ b/test/e2e/pig/tests/multiquery.conf
@@ -728,6 +728,20 @@
c = rank b by name ASC, age DESC DENSE;
store c into ':OUTPATH:';\,
},
+ {
+ # Union + Split + Two replicate join
+ 'num' => 12,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+a1 = filter a by gpa is null or gpa <= 3.9;
+a2 = filter a by gpa < 2;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c1 = filter c by age < 30;
+c2 = filter c by age > 50;
+d = join b by name, c1 by name using 'replicated';
+e = join d by b::name, c2 by name using 'replicated';
+store e into ':OUTPATH:';\,
+ }
] # end of tests
},
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld
new file mode 100644
index 0000000..a2fc8e5
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld
@@ -0,0 +1,109 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-56 -> Tez vertex scope-58,Tez vertex scope-60,
+Tez vertex scope-58 -> Tez vertex scope-62,
+Tez vertex scope-60 -> Tez vertex scope-62,
+Tez vertex scope-66 -> Tez vertex scope-68,Tez vertex scope-70,
+Tez vertex scope-68 -> Tez vertex scope-62,
+Tez vertex scope-70 -> Tez vertex scope-62,
+Tez vertex scope-62
+
+Tez vertex scope-56
+# Plan on vertex
+POValueOutputTez - scope-57 -> [scope-58, scope-60]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-58
+# Plan on vertex
+POValueOutputTez - scope-64 -> [scope-62]
+|
+|---POValueInputTez - scope-59 <- scope-56
+Tez vertex scope-60
+# Plan on vertex
+POValueOutputTez - scope-65 -> [scope-62]
+|
+|---b: Filter[bag] - scope-13
+ | |
+ | Equal To[boolean] - scope-16
+ | |
+ | |---Project[int][0] - scope-14
+ | |
+ | |---Constant(2) - scope-15
+ |
+ |---POValueInputTez - scope-61 <- scope-56
+Tez vertex scope-66
+# Plan on vertex
+POValueOutputTez - scope-67 -> [scope-68, scope-70]
+|
+|---c: New For Each(false,false)[bag] - scope-25
+ | |
+ | Cast[int] - scope-20
+ | |
+ | |---Project[bytearray][0] - scope-19
+ | |
+ | Cast[int] - scope-23
+ | |
+ | |---Project[bytearray][1] - scope-22
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-18
+Tez vertex scope-68
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-38 -> scope-62
+| |
+| Project[int][0] - scope-34
+|
+|---e: Filter[bag] - scope-29
+ | |
+ | Less Than[boolean] - scope-32
+ | |
+ | |---Project[int][1] - scope-30
+ | |
+ | |---Constant(2) - scope-31
+ |
+ |---POValueInputTez - scope-69 <- scope-66
+Tez vertex scope-70
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-52 -> scope-62
+| |
+| Project[int][0] - scope-48
+|
+|---f: Filter[bag] - scope-43
+ | |
+ | Greater Than[boolean] - scope-46
+ | |
+ | |---Project[int][1] - scope-44
+ | |
+ | |---Constant(5) - scope-45
+ |
+ |---POValueInputTez - scope-71 <- scope-66
+Tez vertex scope-62
+# Plan on vertex
+h: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-55
+|
+|---h: FRJoin[tuple] - scope-49 <- scope-70
+ | |
+ | Project[int][0] - scope-47
+ | |
+ | Project[int][0] - scope-48
+ |
+ |---g: FRJoin[tuple] - scope-35 <- scope-68
+ | |
+ | Project[int][0] - scope-33
+ | |
+ | Project[int][0] - scope-34
+ |
+ |---POShuffledValueInputTez - scope-63 <- [scope-58, scope-60]
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld
new file mode 100644
index 0000000..4bd4526
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-66 -> Tez vertex scope-56,Tez vertex scope-70,
+Tez vertex scope-70 -> Tez vertex scope-56,
+Tez vertex scope-56
+
+Tez vertex scope-66
+# Plan on vertex
+c: Split - scope-73
+| |
+| Local Rearrange[tuple]{int}(false) - scope-38 -> scope-56
+| | |
+| | Project[int][0] - scope-34
+| |
+| |---e: Filter[bag] - scope-29
+| | |
+| | Less Than[boolean] - scope-32
+| | |
+| | |---Project[int][1] - scope-30
+| | |
+| | |---Constant(2) - scope-31
+| |
+| POValueOutputTez - scope-67 -> [scope-70]
+|
+|---c: New For Each(false,false)[bag] - scope-25
+ | |
+ | Cast[int] - scope-20
+ | |
+ | |---Project[bytearray][0] - scope-19
+ | |
+ | Cast[int] - scope-23
+ | |
+ | |---Project[bytearray][1] - scope-22
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-18
+Tez vertex scope-70
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-52 -> scope-56
+| |
+| Project[int][0] - scope-48
+|
+|---f: Filter[bag] - scope-43
+ | |
+ | Greater Than[boolean] - scope-46
+ | |
+ | |---Project[int][1] - scope-44
+ | |
+ | |---Constant(5) - scope-45
+ |
+ |---POValueInputTez - scope-71 <- scope-66
+Tez vertex scope-56
+# Plan on vertex
+a: Split - scope-72
+| |
+| h: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-90 -> scope-55
+| |
+| |---h: FRJoin[tuple] - scope-82 <- scope-70
+| | |
+| | Project[int][0] - scope-83
+| | |
+| | Project[int][0] - scope-84
+| |
+| |---g: FRJoin[tuple] - scope-74 <- scope-66
+| | |
+| | Project[int][0] - scope-75
+| | |
+| | Project[int][0] - scope-76
+| |
+| h: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-107 -> scope-55
+| |
+| |---h: FRJoin[tuple] - scope-99 <- scope-70
+| | |
+| | Project[int][0] - scope-100
+| | |
+| | Project[int][0] - scope-101
+| |
+| |---g: FRJoin[tuple] - scope-91 <- scope-66
+| | |
+| | Project[int][0] - scope-92
+| | |
+| | Project[int][0] - scope-93
+| |
+| |---b: Filter[bag] - scope-13
+| | |
+| | Equal To[boolean] - scope-16
+| | |
+| | |---Project[int][0] - scope-14
+| | |
+| | |---Constant(2) - scope-15
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld
new file mode 100644
index 0000000..9752f74
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-55 -> Tez vertex scope-57,
+Tez vertex scope-56 -> Tez vertex scope-57,
+Tez vertex scope-61 -> Tez vertex scope-63,Tez vertex scope-65,
+Tez vertex scope-63 -> Tez vertex scope-57,
+Tez vertex scope-65 -> Tez vertex scope-57,
+Tez vertex scope-57
+
+Tez vertex scope-55
+# Plan on vertex
+POValueOutputTez - scope-59 -> [scope-57]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-56
+# Plan on vertex
+POValueOutputTez - scope-60 -> [scope-57]
+|
+|---b: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-61
+# Plan on vertex
+POValueOutputTez - scope-62 -> [scope-63, scope-65]
+|
+|---c: New For Each(false,false)[bag] - scope-24
+ | |
+ | Cast[int] - scope-19
+ | |
+ | |---Project[bytearray][0] - scope-18
+ | |
+ | Cast[int] - scope-22
+ | |
+ | |---Project[bytearray][1] - scope-21
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-63
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-37 -> scope-57
+| |
+| Project[int][0] - scope-33
+|
+|---e: Filter[bag] - scope-28
+ | |
+ | Less Than[boolean] - scope-31
+ | |
+ | |---Project[int][1] - scope-29
+ | |
+ | |---Constant(2) - scope-30
+ |
+ |---POValueInputTez - scope-64 <- scope-61
+Tez vertex scope-65
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-51 -> scope-57
+| |
+| Project[int][0] - scope-47
+|
+|---f: Filter[bag] - scope-42
+ | |
+ | Greater Than[boolean] - scope-45
+ | |
+ | |---Project[int][1] - scope-43
+ | |
+ | |---Constant(5) - scope-44
+ |
+ |---POValueInputTez - scope-66 <- scope-61
+Tez vertex scope-57
+# Plan on vertex
+h: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-54
+|
+|---h: FRJoin[tuple] - scope-48 <- scope-65
+ | |
+ | Project[int][0] - scope-46
+ | |
+ | Project[int][0] - scope-47
+ |
+ |---g: FRJoin[tuple] - scope-34 <- scope-63
+ | |
+ | Project[int][0] - scope-32
+ | |
+ | Project[int][0] - scope-33
+ |
+ |---POShuffledValueInputTez - scope-58 <- [scope-55, scope-56]
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld
new file mode 100644
index 0000000..58a8602
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld
@@ -0,0 +1,103 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-55 -> Tez vertex scope-57,
+Tez vertex scope-56 -> Tez vertex scope-57,
+Tez vertex scope-61 -> Tez vertex scope-57,Tez vertex scope-65,
+Tez vertex scope-65 -> Tez vertex scope-57,
+Tez vertex scope-57
+
+Tez vertex scope-55
+# Plan on vertex
+POValueOutputTez - scope-59 -> [scope-57]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+ | |
+ | Cast[int] - scope-2
+ | |
+ | |---Project[bytearray][0] - scope-1
+ | |
+ | Cast[int] - scope-5
+ | |
+ | |---Project[bytearray][1] - scope-4
+ |
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-56
+# Plan on vertex
+POValueOutputTez - scope-60 -> [scope-57]
+|
+|---b: New For Each(false,false)[bag] - scope-15
+ | |
+ | Cast[int] - scope-10
+ | |
+ | |---Project[bytearray][0] - scope-9
+ | |
+ | Cast[int] - scope-13
+ | |
+ | |---Project[bytearray][1] - scope-12
+ |
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-61
+# Plan on vertex
+c: Split - scope-67
+| |
+| Local Rearrange[tuple]{int}(false) - scope-37 -> scope-57
+| | |
+| | Project[int][0] - scope-33
+| |
+| |---e: Filter[bag] - scope-28
+| | |
+| | Less Than[boolean] - scope-31
+| | |
+| | |---Project[int][1] - scope-29
+| | |
+| | |---Constant(2) - scope-30
+| |
+| POValueOutputTez - scope-62 -> [scope-65]
+|
+|---c: New For Each(false,false)[bag] - scope-24
+ | |
+ | Cast[int] - scope-19
+ | |
+ | |---Project[bytearray][0] - scope-18
+ | |
+ | Cast[int] - scope-22
+ | |
+ | |---Project[bytearray][1] - scope-21
+ |
+ |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-17
+Tez vertex scope-65
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-51 -> scope-57
+| |
+| Project[int][0] - scope-47
+|
+|---f: Filter[bag] - scope-42
+ | |
+ | Greater Than[boolean] - scope-45
+ | |
+ | |---Project[int][1] - scope-43
+ | |
+ | |---Constant(5) - scope-44
+ |
+ |---POValueInputTez - scope-66 <- scope-61
+Tez vertex scope-57
+# Plan on vertex
+h: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-54
+|
+|---h: FRJoin[tuple] - scope-48 <- scope-65
+ | |
+ | Project[int][0] - scope-46
+ | |
+ | Project[int][0] - scope-47
+ |
+ |---g: FRJoin[tuple] - scope-34 <- scope-61
+ | |
+ | Project[int][0] - scope-32
+ | |
+ | Project[int][0] - scope-33
+ |
+ |---POShuffledValueInputTez - scope-58 <- [scope-55, scope-56]
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index 428b16e..cefd1fb 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -645,6 +645,46 @@
}
@Test
+ public void testMultiQueryMultipleReplicateJoinWithUnion() throws Exception {
+ // Replicate joins are from a split
+ String query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = load 'file:///tmp/input2' as (x:int, y:int);" +
+ "c = load 'file:///tmp/input3' as (x:int, y:int);" +
+ "d = union a, b;" +
+ "e = filter c by y < 2;" +
+ "f = filter c by y > 5;" +
+ "g = join d by x, e by x using 'replicated';" +
+ "h = join g by d::x, f by x using 'replicated';" +
+ "store h into 'file:///tmp/pigoutput';";
+
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld");
+
+ // Union is also from a split
+ query =
+ "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+ "b = filter a by x == 2;" +
+ "c = load 'file:///tmp/input3' as (x:int, y:int);" +
+ "d = union a, b;" +
+ "e = filter c by y < 2;" +
+ "f = filter c by y > 5;" +
+ "g = join d by x, e by x using 'replicated';" +
+ "h = join g by d::x, f by x using 'replicated';" +
+ "store h into 'file:///tmp/pigoutput';";
+
+ resetScope();
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld");
+ resetScope();
+ setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false);
+ run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld");
+ }
+
+ @Test
public void testUnionStore() throws Exception {
String query =
"a = load 'file:///tmp/input' as (x:int, y:chararray);" +