ASTERIXDB-1127: fix ExtractCommonOperatorsRule.

Change-Id: I16933a4b72432b5fbd523ca80ce6426f6b6743a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/691
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index d130d4c..ac2ae5c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -233,7 +233,7 @@
                     AlgebricksPartitionConstraint pc = partitionConstraintMap.get(parentOp);
                     if (pc != null) {
                         opConstraint = pc;
-                    } else if (opInputs == null || opInputs.size() == 0) {
+                    } else if ((opInputs == null || opInputs.size() == 0) && finalPass) {
                         opConstraint = new AlgebricksCountPartitionConstraint(1);
                     }
                 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index 7b04bd5..2a28d2e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -35,7 +35,7 @@
 
 public class HeuristicOptimizer {
 
-    public static PhysicalOperatorTag[] hyraxOperators = new PhysicalOperatorTag[] {
+    public static PhysicalOperatorTag[] hyracksOperators = new PhysicalOperatorTag[] {
             PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
             PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
             PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
@@ -44,8 +44,8 @@
             PhysicalOperatorTag.UNION_ALL };
     public static PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
 
-    public static boolean isHyraxOp(PhysicalOperatorTag opTag) {
-        for (PhysicalOperatorTag t : hyraxOperators) {
+    public static boolean isHyracksOp(PhysicalOperatorTag opTag) {
+        for (PhysicalOperatorTag t : hyracksOperators) {
             if (t == opTag) {
                 return true;
             }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index f13187f..3b31f6d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
@@ -67,8 +68,9 @@
                 && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
-        if (!roots.contains(op))
+        if (!roots.contains(op)) {
             roots.add(new MutableObject<ILogicalOperator>(op));
+        }
         return false;
     }
 
@@ -89,10 +91,12 @@
                 topDownMaterialization(roots);
                 genCandidates(context);
                 removeTrivialShare();
-                if (equivalenceClasses.size() > 0)
+                if (equivalenceClasses.size() > 0) {
                     changed = rewrite(context);
-                if (!rewritten)
+                }
+                if (!rewritten) {
                     rewritten = changed;
+                }
                 equivalenceClasses.clear();
                 childrenToParents.clear();
                 opToCandidateInputs.clear();
@@ -110,22 +114,27 @@
             for (int i = candidates.size() - 1; i >= 0; i--) {
                 Mutable<ILogicalOperator> opRef = candidates.get(i);
                 AbstractLogicalOperator aop = (AbstractLogicalOperator) opRef.getValue();
-                if (aop.getOperatorTag() == LogicalOperatorTag.EXCHANGE)
+                if (aop.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                     aop = (AbstractLogicalOperator) aop.getInputs().get(0).getValue();
-                if (aop.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE)
+                }
+                if (aop.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
                     candidates.remove(i);
+                }
             }
         }
-        for (int i = equivalenceClasses.size() - 1; i >= 0; i--)
-            if (equivalenceClasses.get(i).size() < 2)
+        for (int i = equivalenceClasses.size() - 1; i >= 0; i--) {
+            if (equivalenceClasses.get(i).size() < 2) {
                 equivalenceClasses.remove(i);
+            }
+        }
     }
 
     private boolean rewrite(IOptimizationContext context) throws AlgebricksException {
         boolean changed = false;
         for (List<Mutable<ILogicalOperator>> members : equivalenceClasses) {
-            if (rewriteForOneEquivalentClass(members, context))
+            if (rewriteForOneEquivalentClass(members, context)) {
                 changed = true;
+            }
         }
         return changed;
     }
@@ -191,11 +200,13 @@
             List<LogicalVariable> liveVarsNew = new ArrayList<LogicalVariable>();
             VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
             ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
-            for (LogicalVariable liveVar : liveVarsNew)
+            for (LogicalVariable liveVar : liveVarsNew) {
                 assignExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar)));
+            }
             for (Mutable<ILogicalOperator> ref : group) {
-                if (ref.equals(candidate))
+                if (ref.equals(candidate)) {
                     continue;
+                }
                 ArrayList<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
                 Map<LogicalVariable, LogicalVariable> variableMappingBack = new HashMap<LogicalVariable, LogicalVariable>();
                 IsomorphismUtilities.mapVariablesTopDown(ref.getValue(), candidate.getValue(), variableMappingBack);
@@ -227,20 +238,13 @@
                 for (Mutable<ILogicalOperator> parentOpRef : parentOpList) {
                     AbstractLogicalOperator parentOp = (AbstractLogicalOperator) parentOpRef.getValue();
                     int index = parentOp.getInputs().indexOf(ref);
-                    if (parentOp.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
-                        AbstractLogicalOperator parentOpNext = (AbstractLogicalOperator) childrenToParents
-                                .get(parentOpRef).get(0).getValue();
-                        if (parentOpNext.isMap()) {
-                            index = parentOpNext.getInputs().indexOf(parentOpRef);
-                            parentOp = parentOpNext;
-                        }
-                    }
-
                     ILogicalOperator childOp = parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator
                             : projectOperator;
-                    if (parentOp.isMap()) {
+                    if (!HeuristicOptimizer.isHyracksOp(parentOp.getPhysicalOperator().getOperatorTag())) {
                         parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(childOp));
                     } else {
+                        // If the parent operator is a hyracks operator,
+                        // an extra one-to-one exchange is needed.
                         AbstractLogicalOperator exchg = new ExchangeOperator();
                         exchg.setPhysicalOperator(new OneToOneExchangePOperator());
                         exchg.setExecutionMode(childOp.getExecutionMode());
@@ -270,16 +274,19 @@
                 if (candidates.size() > 0) {
                     for (Mutable<ILogicalOperator> opRef : candidates) {
                         List<Mutable<ILogicalOperator>> refs = childrenToParents.get(opRef);
-                        if (refs != null)
+                        if (refs != null) {
                             currentLevelOpRefs.addAll(refs);
+                        }
                     }
                 }
-                if (currentLevelOpRefs.size() == 0)
+                if (currentLevelOpRefs.size() == 0) {
                     continue;
+                }
                 candidatesGrow(currentLevelOpRefs, candidates);
             }
-            if (currentLevelOpRefs.size() == 0)
+            if (currentLevelOpRefs.size() == 0) {
                 break;
+            }
             prune(context);
         }
         if (equivalenceClasses.size() < 1 && previousEquivalenceClasses.size() > 0) {
@@ -301,8 +308,9 @@
                 }
                 opRefList.add(op);
             }
-            if (op.getValue().getInputs().size() == 0)
+            if (op.getValue().getInputs().size() == 0) {
                 candidates.add(op);
+            }
         }
         if (equivalenceClasses.size() > 0) {
             equivalenceClasses.get(0).addAll(candidates);
@@ -344,10 +352,12 @@
                     }
                 }
             }
-            if (!validCandidate)
+            if (!validCandidate) {
                 continue;
-            if (!candidates.contains(op))
+            }
+            if (!candidates.contains(op)) {
                 candidates.add(op);
+            }
         }
     }
 
@@ -361,8 +371,9 @@
         equivalenceClasses.clear();
         for (List<Mutable<ILogicalOperator>> candidates : previousEquivalenceClasses) {
             boolean[] reserved = new boolean[candidates.size()];
-            for (int i = 0; i < reserved.length; i++)
+            for (int i = 0; i < reserved.length; i++) {
                 reserved[i] = false;
+            }
             for (int i = candidates.size() - 1; i >= 0; i--) {
                 if (reserved[i] == false) {
                     List<Mutable<ILogicalOperator>> equivalentClass = new ArrayList<Mutable<ILogicalOperator>>();