PIG-3385: DISTINCT no longer uses custom partitioner

git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.11@1518377 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 7ea554e..34abb99 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@
 
 BUG FIXES
 
+PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
+
 PIG-2507: Semicolon in paramenters for UDF results in parsing error (tnachen via daijy)
  
 PIG-3341: Strict datetime parsing and improve performance of loading datetime values (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
index 41e91bf..1170451 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
@@ -1761,6 +1761,7 @@
             addToMap(lr);
             
             blocking(op);
+            curMROp.customPartitioner = op.getCustomPartitioner();
             
             POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
             pkg.setKeyType(DataType.TUPLE);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
index ed2d39e..da77ceb 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
@@ -60,6 +60,17 @@
     private DataBag distinctBag = null;
     transient Iterator<Tuple> it;
 
+    // PIG-3385: Since GlobalRearrange is not used by PODistinct, passing the
+    // custom partioner through here
+    protected String customPartitioner;
+
+    public String getCustomPartitioner() {
+        return customPartitioner;
+    }
+    public void setCustomPartitioner(String customPartitioner) {
+        this.customPartitioner = customPartitioner;
+    }
+
     public PODistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp, inp);
     }
diff --git a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
index 5f33b07..78c0eff 100644
--- a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
+++ b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
@@ -1559,6 +1559,7 @@
     public void visit(LODistinct loDistinct) throws FrontendException {
         String scope = DEFAULT_SCOPE;
         PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelism());
+        physOp.setCustomPartitioner(loDistinct.getCustomPartitioner());
         physOp.addOriginalLocation(loDistinct.getAlias(), loDistinct.getLocation());
         currentPlan.add(physOp);
         physOp.setResultType(DataType.BAG);
diff --git a/test/org/apache/pig/test/TestEvalPipeline2.java b/test/org/apache/pig/test/TestEvalPipeline2.java
index 39cf807..8a3b2b5 100644
--- a/test/org/apache/pig/test/TestEvalPipeline2.java
+++ b/test/org/apache/pig/test/TestEvalPipeline2.java
@@ -555,7 +555,7 @@
     // See PIG-282
     @Test
     public void testCustomPartitionerGroups() throws Exception{
-    	String[] input = {
+        String[] input = {
                 "1\t1",
                 "2\t1",
                 "3\t1",
@@ -567,37 +567,76 @@
         
         // It should be noted that for a map reduce job, the total number of partitions 
         // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein 
-        // we will get more than one reduce job so that we can use the partitioner. 	
+        // we will get more than one reduce job so that we can use the partitioner.
         // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+        // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+        // partition number is bigger than 1.
         //
-        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;");
+        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
         
         pigServer.store("B", "tmp_testCustomPartitionerGroups");
         
         new File("tmp_testCustomPartitionerGroups").mkdir();
         
-        // SimpleCustomPartitioner partitions as per the parity of the key
-        // Need to change this in SimpleCustomPartitioner is changed
         Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
         BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
- 	    String line = null;      	     
- 	    while((line = reader.readLine()) != null) {
- 	        String[] cols = line.split("\t");
- 	        int value = Integer.parseInt(cols[0]) % 2;
- 	        Assert.assertEquals(0, value);
- 	    }
- 	    Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+        String line = null;
+        while((line = reader.readLine()) != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
         reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
- 	    line = null;      	     
- 	    while((line = reader.readLine()) != null) {
- 	        String[] cols = line.split("\t");
- 	        int value = Integer.parseInt(cols[0]) % 2;
- 	        Assert.assertEquals(1, value);
- 	    } 
+        line = null;
+        int count=0;
+        while((line = reader.readLine()) != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        Assert.assertEquals(4, count);
         Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+        Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
         Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
     }
-    
+
+    // See PIG-3385
+    @Test
+    public void testCustomPartitionerDistinct() throws Exception{
+        String[] input = {
+                "1\t1",
+                "2\t1",
+                "1\t1",
+                "3\t1",
+                "4\t1",
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+        pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+        pigServer.store("B", "tmp_testCustomPartitionerDistinct");
+
+        new File("tmp_testCustomPartitionerDistinct").mkdir();
+
+        // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
+        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
+        String line = null;
+        while((line = reader.readLine()) != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
+        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
+        line = null;
+        int count=0;
+        while((line = reader.readLine()) != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        Assert.assertEquals(4, count);
+        Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
+        Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
+        Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+    }
+
     // See PIG-282
     @Test
     public void testCustomPartitionerCross() throws Exception{
diff --git a/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java b/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
new file mode 100644
index 0000000..342f162
--- /dev/null
+++ b/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class SimpleCustomPartitioner3 extends Partitioner<PigNullableWritable, Writable> {
+    @Override
+    public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+        return numPartitions >= 1 ? 1 : 0;
+    }
+}