PIG-5118: Script fails with Invalid dag containing 0 vertices (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1780969 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d49cdf9..672a69f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -189,6 +189,8 @@
 
 BUG FIXES
 
+PIG-5118: Script fails with Invalid dag containing 0 vertices (rohini)
+
 PIG-5111: e2e Utf8Test fails in local mode (rohini)
 
 PIG-5112: Cleanup pig-template.xml (daijy)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
index f71ded3..4b5da89 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
@@ -22,6 +22,7 @@
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -422,9 +423,11 @@
         TezCompiler comp = new TezCompiler(php, pc);
         comp.compile();
         TezPlanContainer planContainer = comp.getPlanContainer();
-        for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
-                .getKeys().entrySet()) {
-            TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
+        // Doing a sort so that test plan printed remains same between jdk7 and jdk8
+        List<OperatorKey> opKeys = new ArrayList<>(planContainer.getKeys().keySet());
+        Collections.sort(opKeys);
+        for (OperatorKey opKey : opKeys) {
+            TezOperPlan tezPlan = planContainer.getOperator(opKey).getTezOperPlan();
             optimize(tezPlan, pc);
         }
         return planContainer;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
index c112528..79739e9 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
@@ -1099,7 +1099,7 @@
         indexerTezOp.setDontEstimateParallelism(true);
 
         POStore st = TezCompilerUtil.getStore(scope, nig);
-        FileSpec strFile = getTempFileSpec();
+        FileSpec strFile = getTempFileSpec(pigContext);
         st.setSFile(strFile);
         indexAggrOper.plan.addAsLeaf(st);
         indexAggrOper.setClosed(true);
@@ -1266,7 +1266,7 @@
                 rightTezOprAggr.setDontEstimateParallelism(true);
 
                 POStore st = TezCompilerUtil.getStore(scope, nig);
-                FileSpec strFile = getTempFileSpec();
+                FileSpec strFile = getTempFileSpec(pigContext);
                 st.setSFile(strFile);
                 rightTezOprAggr.plan.addAsLeaf(st);
                 rightTezOprAggr.setClosed(true);
@@ -1862,7 +1862,7 @@
      * @return
      * @throws IOException
      */
-    private FileSpec getTempFileSpec() throws IOException {
+    public static FileSpec getTempFileSpec(PigContext pigContext) throws IOException {
         return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
index 9c8269a..4f9b75b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
@@ -25,8 +25,9 @@
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -217,8 +218,12 @@
             newPlan.add(node);
         }
 
-        Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
-        for (TezOperator from : mFromEdges.keySet()) {
+        // Using a LinkedHashSet and doing a sort so that
+        // test plan printed remains same between jdk7 and jdk8
+        Set<Pair<TezOperator, TezOperator>> toReconnect = new LinkedHashSet<Pair<TezOperator, TezOperator>>();
+        List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet());
+        Collections.sort(fromEdges);
+        for (TezOperator from : fromEdges) {
             List<TezOperator> tos = mFromEdges.get(from);
             for (TezOperator to : tos) {
                 if (list.contains(from) || list.contains(to)) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
index 1966280..69e96c2 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
@@ -29,9 +29,14 @@
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -160,100 +165,178 @@
             return;
         }
 
-        TezOperator operToSegment = null;
-        List<TezOperator> succs = new ArrayList<TezOperator>();
+        List<TezOperator> opersToSegment = null;
         try {
             // Split top down from root to leaves
-            SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan);
+            // Get list of operators closer to the root that can be segmented together
+            FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan);
             finder.visit();
-            operToSegment = finder.getOperatorToSegment();
+            opersToSegment = finder.getOperatorsToSegment();
         } catch (VisitorException e) {
             throw new PlanException(e);
         }
-
-        if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) {
-            succs.addAll(tezOperPlan.getSuccessors(operToSegment));
-            for (TezOperator succ : succs) {
-                tezOperPlan.disconnect(operToSegment, succ);
+        if (!opersToSegment.isEmpty()) {
+            Set<TezOperator> commonSplitterPredecessors = new HashSet<>();
+            for (TezOperator operToSegment : opersToSegment) {
+                for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) {
+                    commonSplitterPredecessors
+                            .addAll(getCommonSplitterPredecessors(tezOperPlan,
+                                    operToSegment, succ));
+                }
             }
-            for (TezOperator succ : succs) {
-                try {
-                    if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) {
-                        // Has already been moved to a new plan by previous successor
-                        // as part of dependency. It could have been further split.
-                        // So walk the full plan to find the new plan and connect
-                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
-                        finder.visit();
-                        connect(planNode, finder.getPlanContainerNode());
-                        continue;
+
+            if (commonSplitterPredecessors.isEmpty()) {
+                List<TezOperator> allSuccs = new ArrayList<TezOperator>();
+                // Disconnect all the successors and move them to a new plan
+                for (TezOperator operToSegment : opersToSegment) {
+                    List<TezOperator> succs = new ArrayList<TezOperator>();
+                    succs.addAll(tezOperPlan.getSuccessors(operToSegment));
+                    allSuccs.addAll(succs);
+                    for (TezOperator succ : succs) {
+                        tezOperPlan.disconnect(operToSegment, succ);
                     }
-                    TezOperPlan newOperPlan = new TezOperPlan();
+                }
+                TezOperPlan newOperPlan = new TezOperPlan();
+                for (TezOperator succ : allSuccs) {
                     tezOperPlan.moveTree(succ, newOperPlan);
-                    TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
-                            generateNodeOperatorKey(), newOperPlan);
-                    add(newPlanNode);
-                    connect(planNode, newPlanNode);
-                    split(newPlanNode);
-                    if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) {
-                        // On further split, the successor moved to a new plan container.
-                        // Connect to that
-                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
-                        finder.visit();
-                        disconnect(planNode, newPlanNode);
-                        connect(planNode, finder.getPlanContainerNode());
+                }
+                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
+                        generateNodeOperatorKey(), newOperPlan);
+                add(newPlanNode);
+                connect(planNode, newPlanNode);
+                split(newPlanNode);
+            } else {
+                // If there is a common splitter predecessor between operToSegment and the successor,
+                // we have to separate out that split to be able to segment.
+                // So we store the output of split to a temp store and then change the
+                // splittees to load from it.
+                String scope = opersToSegment.get(0).getOperatorKey().getScope();
+                for (TezOperator splitter : commonSplitterPredecessors) {
+                    try {
+                        List<TezOperator> succs = new ArrayList<TezOperator>();
+                        succs.addAll(tezOperPlan.getSuccessors(splitter));
+                        FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext);
+                        POStore tmpStore = getTmpStore(scope, fileSpec);
+                        // Replace POValueOutputTez with POStore
+                        splitter.plan.remove(splitter.plan.getLeaves().get(0));
+                        splitter.plan.addAsLeaf(tmpStore);
+                        splitter.segmentBelow = true;
+                        splitter.setSplitter(false);
+                        for (TezOperator succ : succs) {
+                            // Replace POValueInputTez with POLoad
+                            POLoad tmpLoad = getTmpLoad(scope, fileSpec);
+                            succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad);
+                        }
+                    } catch (Exception e) {
+                        throw new PlanException(e);
                     }
-                } catch (VisitorException e) {
-                    throw new PlanException(e);
                 }
             }
             split(planNode);
         }
     }
 
-    private static class SegmentOperatorFinder extends TezOpPlanVisitor {
+    private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor {
 
-        private TezOperator operToSegment;
+        private List<TezOperator> opersToSegment = new ArrayList<>();
 
-        public SegmentOperatorFinder(TezOperPlan plan) {
+        public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) {
             super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         }
 
-        public TezOperator getOperatorToSegment() {
-            return operToSegment;
+        public List<TezOperator> getOperatorsToSegment() {
+            return opersToSegment;
         }
 
         @Override
-        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
-            if (tezOperator.needSegmentBelow() && operToSegment == null) {
-                operToSegment = tezOperator;
+        public void visitTezOp(TezOperator tezOp) throws VisitorException {
+            if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) {
+                if (opersToSegment.isEmpty()) {
+                    opersToSegment.add(tezOp);
+                } else {
+                    // If the operator does not have dependency on previous
+                    // operators chosen for segmenting then add it to the
+                    // operators to be segmented together
+                    if (!hasPredecessor(tezOp, opersToSegment)) {
+                        opersToSegment.add(tezOp);
+                    }
+                }
             }
         }
 
+        /**
+         * Check if the tezOp has one of the opsToCheck as a predecessor.
+         * It can be a immediate predecessor or multiple levels up.
+         */
+        private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) {
+            List<TezOperator> predecessors = getPlan().getPredecessors(tezOp);
+            if (predecessors != null) {
+                for (TezOperator pred : predecessors) {
+                    if (opersToSegment.contains(pred)) {
+                        return true;
+                    } else {
+                        if (hasPredecessor(pred, opsToCheck)) {
+                            return true;
+                        }
+                    }
+                }
+            }
+            return false;
+        }
+
     }
 
-    private static class TezOperatorFinder extends TezPlanContainerVisitor {
-
-        private TezPlanContainerNode planContainerNode;
-        private TezOperator operatorToFind;
-
-        public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) {
-            super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan));
-            this.operatorToFind = operatorToFind;
+    private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) {
+        Set<TezOperator> splitters1 = new HashSet<>();
+        Set<TezOperator> splitters2 = new HashSet<>();
+        Set<TezOperator> processedPredecessors = new HashSet<>();
+        // Find predecessors which are splitters
+        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
+        if (!splitters1.isEmpty()) {
+            // For the successor, traverse rest of the plan below it and
+            // search the predecessors of its successors to find any predecessor that might be a splitter.
+            Set<TezOperator> allSuccs = new HashSet<>();
+            getAllSuccessors(plan, successor, allSuccs);
+            processedPredecessors.clear();
+            processedPredecessors.add(successor);
+            for (TezOperator succ : allSuccs) {
+                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
+            }
+            // Find the common ones
+            splitters1.retainAll(splitters2);
         }
+        return splitters1;
+    }
 
-        public TezPlanContainerNode getPlanContainerNode() {
-            return planContainerNode;
-        }
-
-        @Override
-        public void visitTezPlanContainerNode(
-                TezPlanContainerNode tezPlanContainerNode)
-                throws VisitorException {
-            if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) {
-                planContainerNode = tezPlanContainerNode;
+    private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
+            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
+        List<TezOperator> predecessors = plan.getPredecessors(tezOp);
+        if (predecessors != null) {
+            for (TezOperator pred : predecessors) {
+                // Skip processing already processed predecessor to avoid loops
+                if (processedPredecessors.contains(pred)) {
+                    continue;
+                }
+                if (pred.isSplitter()) {
+                    splitters.add(pred);
+                } else if (!pred.needSegmentBelow()) {
+                    processedPredecessors.add(pred);
+                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
+                }
             }
         }
+    }
 
+    private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) {
+        List<TezOperator> successors = plan.getSuccessors(tezOp);
+        if (successors != null) {
+            for (TezOperator succ : successors) {
+                if (!allSuccs.contains(succ)) {
+                    allSuccs.add(succ);
+                    getAllSuccessors(plan, succ, allSuccs);
+                }
+            }
+        }
     }
 
     private synchronized OperatorKey generateNodeOperatorKey() {
@@ -267,6 +350,21 @@
         scopeId = 0;
     }
 
+    private POLoad getTmpLoad(String scope, FileSpec fileSpec){
+        POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+        ld.setPc(pigContext);
+        ld.setIsTmpLoad(true);
+        ld.setLFile(fileSpec);
+        return ld;
+    }
+
+    private POStore getTmpStore(String scope, FileSpec fileSpec){
+        POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+        st.setIsTmpStore(true);
+        st.setSFile(fileSpec);
+        return new POStoreTez(st);
+    }
+
     @Override
     public String toString() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git a/test/e2e/pig/tests/nightly.conf b/test/e2e/pig/tests/nightly.conf
index 42bdd77..1416ebd 100644
--- a/test/e2e/pig/tests/nightly.conf
+++ b/test/e2e/pig/tests/nightly.conf
@@ -2750,6 +2750,41 @@
 			},
 		],
         },
+        {
+        'name' => 'StoreLoad',
+        'tests' => [
+            {
+            'num' => 1,
+            'floatpostprocess' => 1,
+            'delimiter' => '    ',
+            'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int, gpa: double);
+b = filter a by age < 25;
+c = filter a by age > 70;
+store b into ':OUTPATH:.intermediate1' using PigStorage(',');
+store c into ':OUTPATH:.intermediate2' using PigStorage(',');
+d = load ':OUTPATH:.intermediate1' using PigStorage(',') as (name:chararray, age:int, gpa: double);
+e = load ':OUTPATH:.intermediate2' using PigStorage(',') as (name:chararray, age:int, gpa: double);
+f = join d by name, e by name;
+store f into ':OUTPATH:';\,
+            'notmq' => 1,
+            },
+            {
+            # Self join
+            'num' => 2,
+            'floatpostprocess' => 1,
+            'delimiter' => '    ',
+            'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int, gpa: double);
+b = filter a by name == 'nick miller';
+store b into ':OUTPATH:.intermediate' using PigStorage(',');
+c = load ':OUTPATH:.intermediate' using PigStorage(',') as (name:chararray, age:int, gpa: double);
+d = join a by name, c by name;
+store d into ':OUTPATH:';\,
+            'notmq' => 1,
+            },
+        ],
+        },
 
 	{
 		'name' => 'MergeJoin',
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld
deleted file mode 100644
index e8c5cc1..0000000
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld
+++ /dev/null
@@ -1,83 +0,0 @@
-#--------------------------------------------------
-# There are 5 DAGs in the session
-#--------------------------------------------------
-Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,Tez DAG pig-2_scope-2,Tez DAG pig-3_scope-3,
-Tez DAG pig-2_scope-2	->	Tez DAG pig-3_scope-3,Tez DAG pig-4_scope-4,
-Tez DAG pig-1_scope-1	->	Tez DAG pig-3_scope-3,
-Tez DAG pig-3_scope-3
-Tez DAG pig-4_scope-4
-
-#--------------------------------------------------
-# TEZ DAG plan: pig-0_scope-0
-#--------------------------------------------------
-Tez vertex scope-20
-
-Tez vertex scope-20
-# Plan on vertex
-a: Store(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-1
-|
-|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
-#--------------------------------------------------
-# TEZ DAG plan: pig-2_scope-2
-#--------------------------------------------------
-Tez vertex scope-21
-
-Tez vertex scope-21
-# Plan on vertex
-a: Store(file:///tmp/pigoutput/Dir2:BinStorage) - scope-3
-|
-|---a: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-2
-#--------------------------------------------------
-# TEZ DAG plan: pig-1_scope-1
-#--------------------------------------------------
-Tez vertex scope-23
-
-Tez vertex scope-23
-# Plan on vertex
-a: Store(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-7
-|
-|---a: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-6
-#--------------------------------------------------
-# TEZ DAG plan: pig-3_scope-3
-#--------------------------------------------------
-Tez vertex scope-24	->	Tez vertex scope-27,
-Tez vertex scope-26	->	Tez vertex scope-27,
-Tez vertex scope-25	->	Tez vertex scope-27,
-Tez vertex scope-27
-
-Tez vertex scope-24
-# Plan on vertex
-d: Local Rearrange[tuple]{bytearray}(false) - scope-13	->	 scope-27
-|   |
-|   Project[bytearray][0] - scope-14
-|
-|---a: Load(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-8
-Tez vertex scope-26
-# Plan on vertex
-d: Local Rearrange[tuple]{bytearray}(false) - scope-17	->	 scope-27
-|   |
-|   Project[bytearray][0] - scope-18
-|
-|---c: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-10
-Tez vertex scope-25
-# Plan on vertex
-d: Local Rearrange[tuple]{bytearray}(false) - scope-15	->	 scope-27
-|   |
-|   Project[bytearray][0] - scope-16
-|
-|---b: Load(file:///tmp/pigoutput/Dir2:BinStorage) - scope-9
-Tez vertex scope-27
-# Plan on vertex
-d: Store(file:///tmp/pigoutput/Dir5:org.apache.pig.builtin.PigStorage) - scope-19
-|
-|---d: Package(Packager)[tuple]{bytearray} - scope-12
-#--------------------------------------------------
-# TEZ DAG plan: pig-4_scope-4
-#--------------------------------------------------
-Tez vertex scope-22
-
-Tez vertex scope-22
-# Plan on vertex
-a: Store(file:///tmp/pigoutput/Dir4:org.apache.pig.builtin.PigStorage) - scope-5
-|
-|---a: Load(file:///tmp/pigoutput/Dir2:BinStorage) - scope-4
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld
index 7b9a397..8613a36 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld
@@ -1,11 +1,9 @@
 #--------------------------------------------------
-# There are 5 DAGs in the session
+# There are 3 DAGs in the session
 #--------------------------------------------------
-Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,Tez DAG pig-3_scope-3,Tez DAG pig-4_scope-4,
-Tez DAG pig-1_scope-1	->	Tez DAG pig-2_scope-2,Tez DAG pig-4_scope-4,
+Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,
+Tez DAG pig-1_scope-1	->	Tez DAG pig-2_scope-2,
 Tez DAG pig-2_scope-2
-Tez DAG pig-3_scope-3	->	Tez DAG pig-4_scope-4,
-Tez DAG pig-4_scope-4
 
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
@@ -21,54 +19,32 @@
 # TEZ DAG plan: pig-1_scope-1
 #--------------------------------------------------
 Tez vertex scope-21
+Tez vertex scope-23
 
 Tez vertex scope-21
 # Plan on vertex
 a: Store(file:///tmp/pigoutput/Dir2:BinStorage) - scope-3
 |
 |---a: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-2
-#--------------------------------------------------
-# TEZ DAG plan: pig-2_scope-2
-#--------------------------------------------------
-Tez vertex scope-22
-
-Tez vertex scope-22
-# Plan on vertex
-a: Store(file:///tmp/pigoutput/Dir4:org.apache.pig.builtin.PigStorage) - scope-5
-|
-|---a: Load(file:///tmp/pigoutput/Dir2:BinStorage) - scope-4
-#--------------------------------------------------
-# TEZ DAG plan: pig-3_scope-3
-#--------------------------------------------------
-Tez vertex scope-23
-
 Tez vertex scope-23
 # Plan on vertex
 a: Store(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-7
 |
 |---a: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-6
 #--------------------------------------------------
-# TEZ DAG plan: pig-4_scope-4
+# TEZ DAG plan: pig-2_scope-2
 #--------------------------------------------------
-Tez vertex scope-26	->	Tez vertex scope-27,
-Tez vertex scope-25	->	Tez vertex scope-27,
+Tez vertex scope-22
 Tez vertex scope-24	->	Tez vertex scope-27,
+Tez vertex scope-25	->	Tez vertex scope-27,
+Tez vertex scope-26	->	Tez vertex scope-27,
 Tez vertex scope-27
 
-Tez vertex scope-26
+Tez vertex scope-22
 # Plan on vertex
-d: Local Rearrange[tuple]{bytearray}(false) - scope-17	->	 scope-27
-|   |
-|   Project[bytearray][0] - scope-18
+a: Store(file:///tmp/pigoutput/Dir4:org.apache.pig.builtin.PigStorage) - scope-5
 |
-|---c: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-10
-Tez vertex scope-25
-# Plan on vertex
-d: Local Rearrange[tuple]{bytearray}(false) - scope-15	->	 scope-27
-|   |
-|   Project[bytearray][0] - scope-16
-|
-|---b: Load(file:///tmp/pigoutput/Dir2:BinStorage) - scope-9
+|---a: Load(file:///tmp/pigoutput/Dir2:BinStorage) - scope-4
 Tez vertex scope-24
 # Plan on vertex
 d: Local Rearrange[tuple]{bytearray}(false) - scope-13	->	 scope-27
@@ -76,6 +52,20 @@
 |   Project[bytearray][0] - scope-14
 |
 |---a: Load(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-25
+# Plan on vertex
+d: Local Rearrange[tuple]{bytearray}(false) - scope-15	->	 scope-27
+|   |
+|   Project[bytearray][0] - scope-16
+|
+|---b: Load(file:///tmp/pigoutput/Dir2:BinStorage) - scope-9
+Tez vertex scope-26
+# Plan on vertex
+d: Local Rearrange[tuple]{bytearray}(false) - scope-17	->	 scope-27
+|   |
+|   Project[bytearray][0] - scope-18
+|
+|---c: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-10
 Tez vertex scope-27
 # Plan on vertex
 d: Store(file:///tmp/pigoutput/Dir5:org.apache.pig.builtin.PigStorage) - scope-19
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld
new file mode 100644
index 0000000..050df64
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld
@@ -0,0 +1,72 @@
+#--------------------------------------------------
+# There are 2 DAGs in the session
+#--------------------------------------------------
+Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,
+Tez DAG pig-1_scope-1
+
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-32
+
+Tez vertex scope-32
+# Plan on vertex
+a: Split - scope-41
+|   |
+|   b: Store(file:///tmp/pigoutput/Dir2:org.apache.pig.builtin.PigStorage) - scope-9
+|   |
+|   |---b: Filter[bag] - scope-4
+|       |   |
+|       |   Equal To[boolean] - scope-8
+|       |   |
+|       |   |---Cast[int] - scope-6
+|       |   |   |
+|       |   |   |---Project[bytearray][0] - scope-5
+|       |   |
+|       |   |---Constant(1) - scope-7
+|   |
+|   c: Store(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-18
+|   |
+|   |---c: Filter[bag] - scope-13
+|       |   |
+|       |   Equal To[boolean] - scope-17
+|       |   |
+|       |   |---Cast[int] - scope-15
+|       |   |   |
+|       |   |   |---Project[bytearray][0] - scope-14
+|       |   |
+|       |   |---Constant(2) - scope-16
+|
+|---a: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-0
+#--------------------------------------------------
+# TEZ DAG plan: pig-1_scope-1
+#--------------------------------------------------
+Tez vertex scope-38	->	Tez vertex scope-40,
+Tez vertex scope-39	->	Tez vertex scope-40,
+Tez vertex scope-40
+
+Tez vertex scope-38
+# Plan on vertex
+f: Local Rearrange[tuple]{bytearray}(false) - scope-24	->	 scope-40
+|   |
+|   Project[bytearray][0] - scope-25
+|
+|---d: Load(file:///tmp/pigoutput/Dir2:org.apache.pig.builtin.PigStorage) - scope-10
+Tez vertex scope-39
+# Plan on vertex
+f: Local Rearrange[tuple]{bytearray}(false) - scope-26	->	 scope-40
+|   |
+|   Project[bytearray][0] - scope-27
+|
+|---e: Load(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-19
+Tez vertex scope-40
+# Plan on vertex
+f: Store(file:///tmp/pigoutput/Dir5:org.apache.pig.builtin.PigStorage) - scope-31
+|
+|---f: New For Each(true,true)[tuple] - scope-30
+    |   |
+    |   Project[bag][1] - scope-28
+    |   |
+    |   Project[bag][2] - scope-29
+    |
+    |---f: Package(Packager)[tuple]{bytearray} - scope-23
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld
new file mode 100644
index 0000000..c1b8357
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld
@@ -0,0 +1,84 @@
+#--------------------------------------------------
+# There are 2 DAGs in the session
+#--------------------------------------------------
+Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,
+Tez DAG pig-1_scope-1
+
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-39	->	Tez vertex scope-45,Tez vertex scope-56,
+Tez vertex scope-45
+Tez vertex scope-56
+
+Tez vertex scope-39
+# Plan on vertex
+a: Split - scope-60
+|   |
+|   b: Local Rearrange[tuple]{tuple}(true) - scope-44	->	 scope-45
+|   |   |
+|   |   Project[tuple][*] - scope-43
+|   |
+|   c: Local Rearrange[tuple]{bytearray}(false) - scope-24	->	 scope-56
+|   |   |
+|   |   Project[bytearray][0] - scope-25
+|
+|---a: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-45
+# Combine plan on edge <scope-39>
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
+# Plan on vertex
+b: Store(file:///tmp/pigoutput/Dir2:org.apache.pig.builtin.PigStorage) - scope-5
+|
+|---New For Each(true)[bag] - scope-48
+    |   |
+    |   Project[tuple][0] - scope-47
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-46
+Tez vertex scope-56
+# Plan on vertex
+c: Store(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-26
+|
+|---c: Package(Packager)[tuple]{bytearray} - scope-23
+#--------------------------------------------------
+# TEZ DAG plan: pig-1_scope-1
+#--------------------------------------------------
+Tez vertex scope-50	->	Tez vertex scope-49,Tez vertex scope-57,
+Tez vertex scope-49
+Tez vertex scope-57
+
+Tez vertex scope-50
+# Plan on vertex
+f: Split - scope-61
+|   |
+|   Local Rearrange[tuple]{bytearray}(false) - scope-16	->	 scope-49
+|   |   |
+|   |   Project[bytearray][0] - scope-12
+|   |
+|   Local Rearrange[tuple]{bytearray}(false) - scope-35	->	 scope-57
+|   |   |
+|   |   Project[bytearray][0] - scope-31
+|
+|---f: Load(file:///tmp/pigoutput/Dir4:org.apache.pig.builtin.PigStorage) - scope-7
+Tez vertex scope-49
+# Plan on vertex
+g: Store(file:///tmp/pigoutput/Dir4:org.apache.pig.builtin.PigStorage) - scope-19
+|
+|---g: FRJoin[tuple] - scope-13	<-	 scope-50
+    |   |
+    |   Project[bytearray][0] - scope-11
+    |   |
+    |   Project[bytearray][0] - scope-12
+    |
+    |---d: Load(file:///tmp/pigoutput/Dir2:org.apache.pig.builtin.PigStorage) - scope-6
+Tez vertex scope-57
+# Plan on vertex
+h: Store(file:///tmp/pigoutput/Dir5:org.apache.pig.builtin.PigStorage) - scope-38
+|
+|---h: FRJoin[tuple] - scope-32	<-	 scope-50
+    |   |
+    |   Project[bytearray][0] - scope-30
+    |   |
+    |   Project[bytearray][0] - scope-31
+    |
+    |---e: Load(file:///tmp/pigoutput/Dir3:org.apache.pig.builtin.PigStorage) - scope-27
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld
new file mode 100644
index 0000000..88f6595
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld
@@ -0,0 +1,69 @@
+#--------------------------------------------------
+# There are 3 DAGs in the session
+#--------------------------------------------------
+Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,
+Tez DAG pig-1_scope-1	->	Tez DAG pig-2_scope-2,
+Tez DAG pig-2_scope-2
+
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-25
+
+Tez vertex scope-25
+# Plan on vertex
+Store(file:/tmp/temp-994194982/tmp570752215:org.apache.pig.impl.io.InterStorage) - scope-33
+|
+|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+#--------------------------------------------------
+# TEZ DAG plan: pig-1_scope-1
+#--------------------------------------------------
+Tez vertex scope-27
+
+Tez vertex scope-27
+# Plan on vertex
+a1: Store(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-11
+|
+|---a1: Filter[bag] - scope-6
+    |   |
+    |   Equal To[boolean] - scope-10
+    |   |
+    |   |---Cast[int] - scope-8
+    |   |   |
+    |   |   |---Project[bytearray][0] - scope-7
+    |   |
+    |   |---Constant(5) - scope-9
+    |
+    |---Load(file:/tmp/temp-994194982/tmp570752215:org.apache.pig.impl.io.InterStorage) - scope-34
+#--------------------------------------------------
+# TEZ DAG plan: pig-2_scope-2
+#--------------------------------------------------
+Tez vertex scope-29	->	Tez vertex scope-32,
+Tez vertex scope-31	->	Tez vertex scope-32,
+Tez vertex scope-32
+
+Tez vertex scope-29
+# Plan on vertex
+c: Local Rearrange[tuple]{bytearray}(false) - scope-17	->	 scope-32
+|   |
+|   Project[bytearray][0] - scope-18
+|
+|---Load(file:/tmp/temp-994194982/tmp570752215:org.apache.pig.impl.io.InterStorage) - scope-35
+Tez vertex scope-31
+# Plan on vertex
+c: Local Rearrange[tuple]{bytearray}(false) - scope-19	->	 scope-32
+|   |
+|   Project[bytearray][0] - scope-20
+|
+|---b: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-12
+Tez vertex scope-32
+# Plan on vertex
+c: Store(file:///tmp/pigoutput/Dir2:org.apache.pig.builtin.PigStorage) - scope-24
+|
+|---c: New For Each(true,true)[tuple] - scope-23
+    |   |
+    |   Project[bag][1] - scope-21
+    |   |
+    |   Project[bag][2] - scope-22
+    |
+    |---c: Package(Packager)[tuple]{bytearray} - scope-16
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld
new file mode 100644
index 0000000..ff61c9f
--- /dev/null
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld
@@ -0,0 +1,73 @@
+#--------------------------------------------------
+# There are 3 DAGs in the session
+#--------------------------------------------------
+Tez DAG pig-0_scope-0	->	Tez DAG pig-1_scope-1,
+Tez DAG pig-1_scope-1	->	Tez DAG pig-2_scope-2,
+Tez DAG pig-2_scope-2
+
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-21	->	Tez vertex scope-24,
+Tez vertex scope-24
+
+Tez vertex scope-21
+# Plan on vertex
+a: Local Rearrange[tuple]{tuple}(true) - scope-23	->	 scope-24
+|   |
+|   Project[tuple][*] - scope-22
+|
+|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-24
+# Combine plan on edge <scope-21>
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
+# Plan on vertex
+Store(file:/tmp/temp-994194982/tmp570752215:org.apache.pig.impl.io.InterStorage) - scope-35
+|
+|---New For Each(true)[bag] - scope-27
+    |   |
+    |   Project[tuple][0] - scope-26
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-25
+#--------------------------------------------------
+# TEZ DAG plan: pig-1_scope-1
+#--------------------------------------------------
+Tez vertex scope-29
+
+Tez vertex scope-29
+# Plan on vertex
+a: Store(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-7
+|
+|---Load(file:/tmp/temp-994194982/tmp570752215:org.apache.pig.impl.io.InterStorage) - scope-36
+#--------------------------------------------------
+# TEZ DAG plan: pig-2_scope-2
+#--------------------------------------------------
+Tez vertex scope-31	->	Tez vertex scope-34,
+Tez vertex scope-33	->	Tez vertex scope-34,
+Tez vertex scope-34
+
+Tez vertex scope-31
+# Plan on vertex
+c: Local Rearrange[tuple]{bytearray}(false) - scope-13	->	 scope-34
+|   |
+|   Project[bytearray][0] - scope-14
+|
+|---Load(file:/tmp/temp-994194982/tmp570752215:org.apache.pig.impl.io.InterStorage) - scope-37
+Tez vertex scope-33
+# Plan on vertex
+c: Local Rearrange[tuple]{bytearray}(false) - scope-15	->	 scope-34
+|   |
+|   Project[bytearray][0] - scope-16
+|
+|---b: Load(file:///tmp/pigoutput/Dir1:org.apache.pig.builtin.PigStorage) - scope-8
+Tez vertex scope-34
+# Plan on vertex
+c: Store(file:///tmp/pigoutput/Dir2:org.apache.pig.builtin.PigStorage) - scope-20
+|
+|---c: New For Each(true,true)[tuple] - scope-19
+    |   |
+    |   Project[bag][1] - scope-17
+    |   |
+    |   Project[bag][2] - scope-18
+    |
+    |---c: Package(Packager)[tuple]{bytearray} - scope-12
diff --git a/test/org/apache/pig/tez/TestTezCompiler.java b/test/org/apache/pig/tez/TestTezCompiler.java
index 801c195..c7dec22 100644
--- a/test/org/apache/pig/tez/TestTezCompiler.java
+++ b/test/org/apache/pig/tez/TestTezCompiler.java
@@ -26,6 +26,7 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Properties;
+import java.util.Random;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.mapreduce.Job;
@@ -44,6 +45,7 @@
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
@@ -74,12 +76,14 @@
 
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
+        resetFileLocalizer();
         pc = new PigContext(new TezLocalExecType(), new Properties());
         FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
     }
 
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
+        resetFileLocalizer();
     }
 
     @Before
@@ -98,6 +102,13 @@
         TezPlanContainer.resetScope();
     }
 
+    private static void resetFileLocalizer() {
+        FileLocalizer.deleteTempFiles();
+        FileLocalizer.setInitialized(false);
+        // Set random seed to generate deterministic temporary paths
+        FileLocalizer.setR(new Random(1331L));
+    }
+
     @Test
     public void testStoreLoad() throws Exception {
         String query =
@@ -126,12 +137,72 @@
                 "d = cogroup a by $0, b by $0, c by $0;" +
                 "store d into 'file:///tmp/pigoutput/Dir5';";
 
-        // To get around difference in ordering of operators in plan due to JDK7 and JDK8
-        if (System.getProperties().getProperty("java.version").startsWith("1.8")) {
-            run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
-        } else {
-            run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld");
-        }
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld");
+    }
+
+    @Test
+    public void testStoreLoadJoinMultiple() throws Exception {
+        // Case where different store load statements are used in a single join
+        String query =
+                "a = load 'file:///tmp/pigoutput/Dir1';" +
+                "b = filter a by $0 == 1;" +
+                "c = filter a by $0 == 2;" +
+                "store b into 'file:///tmp/pigoutput/Dir2';" +
+                "store c into 'file:///tmp/pigoutput/Dir3';" +
+                "d = load 'file:///tmp/pigoutput/Dir2';" +
+                "e = load 'file:///tmp/pigoutput/Dir3';" +
+                "f = join d by $0, e by $0;" +
+                "store f into 'file:///tmp/pigoutput/Dir5';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld");
+
+        resetScope();
+        query =
+                "a = load 'file:///tmp/pigoutput/Dir1';" +
+                "b = distinct a;" +
+                "c = group a by $0;" +
+                "store b into 'file:///tmp/pigoutput/Dir2';" +
+                "store c into 'file:///tmp/pigoutput/Dir3';" +
+                "d = load 'file:///tmp/pigoutput/Dir2';" +
+                "e = load 'file:///tmp/pigoutput/Dir3';" +
+                "f = load 'file:///tmp/pigoutput/Dir4';" +
+                "g = join d by $0, f by $0 using 'repl';" +
+                "h = join e by $0, f by $0 using 'repl';" +
+                "store g into 'file:///tmp/pigoutput/Dir4';" +
+                "store h into 'file:///tmp/pigoutput/Dir5';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld");
+    }
+
+    @Test
+    public void testStoreLoadSplit() throws Exception {
+        // Cases where segmenting into two DAGs is not straight forward due to Split.
+        // The Split operator is required in both the segments.
+
+        resetFileLocalizer();
+        // Split operator as root vertex
+        String query =
+                "a = load 'file:///tmp/input';" +
+                "a1 = filter a by $0 == 5;" +
+                "store a1 into 'file:///tmp/pigoutput/Dir1';" +
+                "b = load 'file:///tmp/pigoutput/Dir1';" +
+                "c = join a by $0, b by $0;" +
+                "store c into 'file:///tmp/pigoutput/Dir2';";
+
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld");
+
+        // Split operator as intermediate vertex
+        query =
+                "a = load 'file:///tmp/input';" +
+                "a = distinct a;" +
+                "store a into 'file:///tmp/pigoutput/Dir1';" +
+                "b = load 'file:///tmp/pigoutput/Dir1';" +
+                "c = join a by $0, b by $0;" +
+                "store c into 'file:///tmp/pigoutput/Dir2';";
+
+        resetScope();
+        resetFileLocalizer();
+        run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld");
     }
 
     @Test