PIG-3435: Custom Partitioner not working with MultiQueryOptimizer
git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.11@1518378 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 34abb99..34c7bd9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@
BUG FIXES
+PIG-3435: Custom Partitioner not working with MultiQueryOptimizer (knoguchi via daijy)
+
PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
PIG-2507: Semicolon in paramenters for UDF results in parsing error (tnachen via daijy)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
index 3c1c9b5..64f0ee1 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
@@ -115,6 +115,11 @@
+ " uses secondary key, do not merge it");
continue;
}
+ if (successor.getCustomPartitioner() != null) {
+ log.debug("Splittee " + successor.getOperatorKey().getId()
+ + " uses customPartitioner, do not merge it");
+ continue;
+ }
if (isMapOnly(successor)) {
if (isSingleLoadMapperPlan(successor.mapPlan)
&& isSinglePredecessor(successor)) {
diff --git a/test/org/apache/pig/test/TestMultiQueryCompiler.java b/test/org/apache/pig/test/TestMultiQueryCompiler.java
index cc00a57..f05c42b 100644
--- a/test/org/apache/pig/test/TestMultiQueryCompiler.java
+++ b/test/org/apache/pig/test/TestMultiQueryCompiler.java
@@ -1388,9 +1388,13 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
- MROperPlan mrp = checkMRPlan(pp, 1, 1, 1);
-
- MapReduceOper mrop = mrp.getRoots().get(0);
+ //merging is skipped due to PIG-3435 for now
+ //MROperPlan mrp = checkMRPlan(pp, 1, 1, 1);
+ //MapReduceOper mrop = mrp.getRoots().get(0);
+
+ //Instead of 1 merged mapreduce job, there will be two.
+ MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+ MapReduceOper mrop = mrp.getLeaves().get(0);
Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
} catch (Exception e) {
@@ -1398,6 +1402,45 @@
Assert.fail();
}
}
+
+ @Test
+ public void testMultiQueryDoNotMergeMRwithDifferentPartitioners() {
+
+ System.out.println("===== multi-query with intermediate stores =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b1 = FILTER a BY gid != 0;");
+ myPig.registerQuery("b2 = FILTER a BY uid > 100;");
+ myPig.registerQuery("c1 = GROUP b1 BY uname PARTITION BY " + SimpleCustomPartitioner.class.getName() + " PARALLEL 3;");
+ myPig.registerQuery("c2 = GROUP b2 BY uname PARALLEL 3;");
+ myPig.registerQuery("STORE c1 INTO 'output1';");
+ myPig.registerQuery("STORE c2 INTO 'output2';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 2, 7);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 15);
+
+ MROperPlan mrp = checkMRPlan(pp, 1, 1, 2);
+
+ // since the first mapreduce job of mrp.getRoots().get(0)
+ // is the merge of splitter and splittee without custom partitioner (c2 above),
+ // second job should contain the custom partitioner
+ MapReduceOper mrop;
+ mrop = mrp.getRoots().get(0);
+ Assert.assertTrue(mrop.getCustomPartitioner() == null );
+ mrop = mrp.getLeaves().get(0);
+ Assert.assertTrue(mrop.getCustomPartitioner().equals(SimpleCustomPartitioner.class.getName()));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ }
+ }
+
// --------------------------------------------------------------------------
// Helper methods