PIG-5207: BugFix e2e tests fail on spark (szita)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1796703 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b510c7..186b363 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -109,6 +109,8 @@
  
 BUG FIXES
 
+PIG-5207: BugFix e2e tests fail on spark (szita)
+
 PIG-5194: HiveUDF fails with Spark exec type (szita)
 
 PIG-5231: PigStorage with -schema may produce inconsistent outputs with more fields (knoguchi)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
index 4ee4591..16a2e27 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,8 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 
+import com.google.common.collect.HashBiMap;
+
 /**
  *
  * The base class for all types of physical plans.
@@ -304,6 +307,16 @@
             }
         }
 
+        //Fix order of edges in mToEdges lists
+        Map<PhysicalOperator, PhysicalOperator> invertedMatches = HashBiMap.create(matches).inverse();
+        for (PhysicalOperator newOp : clone.mToEdges.keySet()) {
+            List<PhysicalOperator> newList = clone.mToEdges.get(newOp);
+            if (newList.size() > 1) {
+                List<PhysicalOperator> originalList = this.mToEdges.get(invertedMatches.get(newOp));
+                Collections.sort(newList, new EdgeOrderHelper(originalList,invertedMatches));
+            }
+        }
+
         return clone;
     }
 
@@ -315,4 +328,21 @@
     {
         opmap = null;
     }
+
+
+    private static class EdgeOrderHelper implements Comparator<PhysicalOperator> {
+
+        private final Map<PhysicalOperator, PhysicalOperator> invertedMatches;
+        private final List<PhysicalOperator> originalList;
+
+        public EdgeOrderHelper(List<PhysicalOperator> originalList, Map<PhysicalOperator, PhysicalOperator> invertedMatches) {
+            this.originalList = originalList;
+            this.invertedMatches = invertedMatches;
+        }
+
+        @Override
+        public int compare(PhysicalOperator o1, PhysicalOperator o2) {
+            return originalList.indexOf(invertedMatches.get(o1)) - originalList.indexOf(invertedMatches.get(o2));
+        }
+    }
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
index 8c0e648..a404eef 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
@@ -296,8 +296,13 @@
             );
             newProj.setResultType(DataType.BAG);
 
-            PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);
-            pplan.disconnect(udfInput, combineUdf);
+            for (PhysicalOperator originalUdfInput : pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0])) {
+                if (pplan.getPredecessors(originalUdfInput) != null) {
+                    pplan.trimAbove(originalUdfInput);
+                }
+                pplan.remove(originalUdfInput);
+            }
+
             pplan.add(newProj);
             pplan.connect(newProj, combineUdf);
             i++;