PIG-5207: BugFix e2e tests fail on spark (szita)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1796703 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b510c7..186b363 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -109,6 +109,8 @@
BUG FIXES
+PIG-5207: BugFix e2e tests fail on spark (szita)
+
PIG-5194: HiveUDF fails with Spark exec type (szita)
PIG-5231: PigStorage with -schema may produce inconsistent outputs with more fields (knoguchi)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
index 4ee4591..16a2e27 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,8 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
+import com.google.common.collect.HashBiMap;
+
/**
*
* The base class for all types of physical plans.
@@ -304,6 +307,16 @@
}
}
+ //Fix order of edges in mToEdges lists
+ Map<PhysicalOperator, PhysicalOperator> invertedMatches = HashBiMap.create(matches).inverse();
+ for (PhysicalOperator newOp : clone.mToEdges.keySet()) {
+ List<PhysicalOperator> newList = clone.mToEdges.get(newOp);
+ if (newList.size() > 1) {
+ List<PhysicalOperator> originalList = this.mToEdges.get(invertedMatches.get(newOp));
+ Collections.sort(newList, new EdgeOrderHelper(originalList,invertedMatches));
+ }
+ }
+
return clone;
}
@@ -315,4 +328,21 @@
{
opmap = null;
}
+
+
+ private static class EdgeOrderHelper implements Comparator<PhysicalOperator> {
+
+ private final Map<PhysicalOperator, PhysicalOperator> invertedMatches;
+ private final List<PhysicalOperator> originalList;
+
+ public EdgeOrderHelper(List<PhysicalOperator> originalList, Map<PhysicalOperator, PhysicalOperator> invertedMatches) {
+ this.originalList = originalList;
+ this.invertedMatches = invertedMatches;
+ }
+
+ @Override
+ public int compare(PhysicalOperator o1, PhysicalOperator o2) {
+ return originalList.indexOf(invertedMatches.get(o1)) - originalList.indexOf(invertedMatches.get(o2));
+ }
+ }
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
index 8c0e648..a404eef 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
@@ -296,8 +296,13 @@
);
newProj.setResultType(DataType.BAG);
- PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);
- pplan.disconnect(udfInput, combineUdf);
+ for (PhysicalOperator originalUdfInput : pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0])) {
+ if (pplan.getPredecessors(originalUdfInput) != null) {
+ pplan.trimAbove(originalUdfInput);
+ }
+ pplan.remove(originalUdfInput);
+ }
+
pplan.add(newProj);
pplan.connect(newProj, combineUdf);
i++;