PIG-5173: Script with multiple splits fails with Invalid dag containing 0 vertices (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1785370 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index b99e1d6..0f4ae72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -89,6 +89,8 @@
  
 BUG FIXES
 
+PIG-5173: Script with multiple splits fails with Invalid dag containing 0 vertices (rohini)
+
 PIG-5159: Fix Pig not saving grunt history (szita via rohini)
 
 PIG-5127: Test fail when running test-core-mrtez (daijy)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
index 69e96c2..5b9547b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
@@ -291,7 +291,7 @@
         Set<TezOperator> splitters2 = new HashSet<>();
         Set<TezOperator> processedPredecessors = new HashSet<>();
         // Find predecessors which are splitters
-        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
+        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1, false);
         if (!splitters1.isEmpty()) {
             // For the successor, traverse rest of the plan below it and
             // search the predecessors of its successors to find any predecessor that might be a splitter.
@@ -300,7 +300,7 @@
             processedPredecessors.clear();
             processedPredecessors.add(successor);
             for (TezOperator succ : allSuccs) {
-                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
+                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2, true);
             }
             // Find the common ones
             splitters1.retainAll(splitters2);
@@ -309,7 +309,7 @@
     }
 
     private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
-            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
+            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters, boolean stopAtSplit) {
         List<TezOperator> predecessors = plan.getPredecessors(tezOp);
         if (predecessors != null) {
             for (TezOperator pred : predecessors) {
@@ -319,9 +319,13 @@
                 }
                 if (pred.isSplitter()) {
                     splitters.add(pred);
+                    if (!stopAtSplit) {
+                        processedPredecessors.add(pred);
+                        fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit);
+                    }
                 } else if (!pred.needSegmentBelow()) {
                     processedPredecessors.add(pred);
-                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
+                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters, stopAtSplit);
                 }
             }
         }
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld
new file mode 100644
index 0000000..cbfcd63
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld
@@ -0,0 +1,105 @@
+#--------------------------------------------------
+# There are 3 DAGs in the session
+#--------------------------------------------------
+Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,
+Tez DAG pig-1_scope-1	->	Tez DAG pig-2_scope-2,
+Tez DAG pig-2_scope-2
+
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-45
+
+Tez vertex scope-45
+# Plan on vertex
+a: Split - scope-76
+|   |
+|   a: Store(file:///tmp/pigoutput/Dir0:org.apache.pig.builtin.PigStorage) - scope-4
+|   |
+|   Store(file:/tmp/temp-1456742965/tmp774375955:org.apache.pig.impl.io.InterStorage) - scope-72
+|   |
+|   |---a1: Filter[bag] - scope-7
+|       |   |
+|       |   Equal To[boolean] - scope-11
+|       |   |
+|       |   |---Cast[int] - scope-9
+|       |   |   |
+|       |   |   |---Project[bytearray][0] - scope-8
+|       |   |
+|       |   |---Constant(5) - scope-10
+|
+|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+#--------------------------------------------------
+# TEZ DAG plan: pig-1_scope-1
+#--------------------------------------------------
+Tez vertex scope-52
+Tez vertex scope-54	->	Tez vertex scope-58,
+Tez vertex scope-58	->	Tez vertex scope-67,
+Tez vertex scope-67
+
+Tez vertex scope-52
+# Plan on vertex
+a1: Store(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-15
+|
+|---Load(file:/tmp/temp-1456742965/tmp774375955:org.apache.pig.impl.io.InterStorage) - scope-73
+Tez vertex scope-54
+# Plan on vertex
+a2: Local Rearrange[tuple]{tuple}(true) - scope-57	->	 scope-58
+|   |
+|   Project[tuple][*] - scope-56
+|
+|---Load(file:/tmp/temp-1456742965/tmp774375955:org.apache.pig.impl.io.InterStorage) - scope-74
+Tez vertex scope-58
+# Combine plan on edge <scope-54>
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
+# Plan on vertex
+a2: Split - scope-77
+|   |
+|   a2: Store(file:///tmp/pigoutput/Dir2:org.apache.pig.builtin.PigStorage) - scope-22
+|   |
+|   a3: Local Rearrange[tuple]{bytearray}(false) - scope-29	->	 scope-67
+|   |   |
+|   |   Project[bytearray][0] - scope-30
+|
+|---New For Each(true)[bag] - scope-61
+    |   |
+    |   Project[tuple][0] - scope-60
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-59
+Tez vertex scope-67
+# Plan on vertex
+a3: Store(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-31
+|
+|---a3: Package(Packager)[tuple]{bytearray} - scope-28
+#--------------------------------------------------
+# TEZ DAG plan: pig-2_scope-2
+#--------------------------------------------------
+Tez vertex scope-68	->	Tez vertex scope-71,
+Tez vertex scope-70	->	Tez vertex scope-71,
+Tez vertex scope-71
+
+Tez vertex scope-68
+# Plan on vertex
+c: Local Rearrange[tuple]{bytearray}(false) - scope-37	->	 scope-71
+|   |
+|   Project[bytearray][0] - scope-38
+|
+|---Load(file:/tmp/temp-1456742965/tmp774375955:org.apache.pig.impl.io.InterStorage) - scope-75
+Tez vertex scope-70
+# Plan on vertex
+c: Local Rearrange[tuple]{bytearray}(false) - scope-39	->	 scope-71
+|   |
+|   Project[bytearray][0] - scope-40
+|
+|---b: Load(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-32
+Tez vertex scope-71
+# Plan on vertex
+c: Store(file:///tmp/pigoutput/Dir4:org.apache.pig.builtin.PigStorage) - scope-44
+|
+|---c: New For Each(true,true)[tuple] - scope-43
+    |   |
+    |   Project[bag][1] - scope-41
+    |   |
+    |   Project[bag][2] - scope-42
+    |
+    |---c: Package(Packager)[tuple]{bytearray} - scope-36
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index c7dec22..21f03e8 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -203,6 +203,25 @@
         resetScope();
         resetFileLocalizer();
         run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld");
+
+        // Three levels of splits - a, a1 and a2.
+        // One split above and one split below a1 which is the split to be replaced with tmp store.
+        query =
+                "a = load 'file:///tmp/input';" +
+                "store a into 'file:///tmp/pigoutput/Dir0';" +
+                "a1 = filter a by $0 == 5;" +
+                "store a1 into 'file:///tmp/pigoutput/Dir1';" +
+                "a2 = distinct a1;" +
+                "store a2 into 'file:///tmp/pigoutput/Dir2';" +
+                "a3 = group a2 by $0;" +
+                "store a3 into 'file:///tmp/pigoutput/Dir3';" +
+                "b = load 'file:///tmp/pigoutput/Dir3';" +
+                "c = join a1 by $0, b by $0;" +
+                "store c into 'file:///tmp/pigoutput/Dir4';";
+
+        resetScope();
+        resetFileLocalizer();
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld");
     }
 
     @Test