PIG-3435: Custom Partitioner not working with MultiQueryOptimizer

git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.11@1518378 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 34abb99..34c7bd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@
 
 BUG FIXES
 
+PIG-3435: Custom Partitioner not working with MultiQueryOptimizer (knoguchi via daijy)
+
 PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
 
 PIG-2507: Semicolon in paramenters for UDF results in parsing error (tnachen via daijy)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
index 3c1c9b5..64f0ee1 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
@@ -115,6 +115,11 @@
                         + " uses secondary key, do not merge it");
                 continue;
             }
+            if (successor.getCustomPartitioner() != null) {
+                log.debug("Splittee " + successor.getOperatorKey().getId()
+                        + " uses customPartitioner, do not merge it");
+                continue;
+            }
             if (isMapOnly(successor)) {
                 if (isSingleLoadMapperPlan(successor.mapPlan)
                         && isSinglePredecessor(successor)) {                    
diff --git a/test/org/apache/pig/test/TestMultiQueryCompiler.java b/test/org/apache/pig/test/TestMultiQueryCompiler.java
index cc00a57..f05c42b 100644
--- a/test/org/apache/pig/test/TestMultiQueryCompiler.java
+++ b/test/org/apache/pig/test/TestMultiQueryCompiler.java
@@ -1388,9 +1388,13 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
 
-            MROperPlan mrp = checkMRPlan(pp, 1, 1, 1); 
-            
-            MapReduceOper mrop = mrp.getRoots().get(0);
+            //merging is skipped due to PIG-3435 for now
+            //MROperPlan mrp = checkMRPlan(pp, 1, 1, 1);
+            //MapReduceOper mrop = mrp.getRoots().get(0);
+
+            //Instead of 1 merged mapreduce job, there will be two.
+            MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+            MapReduceOper mrop = mrp.getLeaves().get(0);
             Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
 
         } catch (Exception e) {
@@ -1398,6 +1402,45 @@
             Assert.fail();
         } 
     }    
+
+    @Test
+    public void testMultiQueryDoNotMergeMRwithDifferentPartitioners() {
+
+        System.out.println("===== multi-query with intermediate stores =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'passwd' " +
+                                "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b1 = FILTER a BY gid != 0;");
+            myPig.registerQuery("b2 = FILTER a BY uid > 100;");
+            myPig.registerQuery("c1 = GROUP b1 BY uname PARTITION BY " + SimpleCustomPartitioner.class.getName() + " PARALLEL 3;");
+            myPig.registerQuery("c2 = GROUP b2 BY uname PARALLEL 3;");
+            myPig.registerQuery("STORE c1 INTO 'output1';");
+            myPig.registerQuery("STORE c2 INTO 'output2';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 2, 7);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 15);
+
+            MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+
+            // since the first mapreduce job of mrp.getRoots().get(0)
+            // is the merge of splitter and splittee without custom partitioner (c2 above),
+            // second job should contain the custom partitioner
+            MapReduceOper mrop;
+            mrop = mrp.getRoots().get(0);
+            Assert.assertTrue(mrop.getCustomPartitioner() == null );
+            mrop = mrp.getLeaves().get(0);
+            Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.toString());
+        }
+    }
+
     
     // --------------------------------------------------------------------------
     // Helper methods