PIG-3385: DISTINCT no longer uses custom partitioner
git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.11@1518377 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 7ea554e..34abb99 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@
BUG FIXES
+PIG-3385: DISTINCT no longer uses custom partitioner (knoguchi via daijy)
+
PIG-2507: Semicolon in paramenters for UDF results in parsing error (tnachen via daijy)
PIG-3341: Strict datetime parsing and improve performance of loading datetime values (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
index 41e91bf..1170451 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
@@ -1761,6 +1761,7 @@
addToMap(lr);
blocking(op);
+ curMROp.customPartitioner = op.getCustomPartitioner();
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
pkg.setKeyType(DataType.TUPLE);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
index ed2d39e..da77ceb 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
@@ -60,6 +60,17 @@
private DataBag distinctBag = null;
transient Iterator<Tuple> it;
+ // PIG-3385: Since GlobalRearrange is not used by PODistinct, passing the
+ // custom partioner through here
+ protected String customPartitioner;
+
+ public String getCustomPartitioner() {
+ return customPartitioner;
+ }
+ public void setCustomPartitioner(String customPartitioner) {
+ this.customPartitioner = customPartitioner;
+ }
+
public PODistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
}
diff --git a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
index 5f33b07..78c0eff 100644
--- a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
+++ b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
@@ -1559,6 +1559,7 @@
public void visit(LODistinct loDistinct) throws FrontendException {
String scope = DEFAULT_SCOPE;
PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelism());
+ physOp.setCustomPartitioner(loDistinct.getCustomPartitioner());
physOp.addOriginalLocation(loDistinct.getAlias(), loDistinct.getLocation());
currentPlan.add(physOp);
physOp.setResultType(DataType.BAG);
diff --git a/test/org/apache/pig/test/TestEvalPipeline2.java b/test/org/apache/pig/test/TestEvalPipeline2.java
index 39cf807..8a3b2b5 100644
--- a/test/org/apache/pig/test/TestEvalPipeline2.java
+++ b/test/org/apache/pig/test/TestEvalPipeline2.java
@@ -555,7 +555,7 @@
// See PIG-282
@Test
public void testCustomPartitionerGroups() throws Exception{
- String[] input = {
+ String[] input = {
"1\t1",
"2\t1",
"3\t1",
@@ -567,37 +567,76 @@
// It should be noted that for a map reduce job, the total number of partitions
// is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
- // we will get more than one reduce job so that we can use the partitioner.
+ // we will get more than one reduce job so that we can use the partitioner.
// The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+ // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+ // partition number is bigger than 1.
//
- pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner parallel 2;");
+ pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
pigServer.store("B", "tmp_testCustomPartitionerGroups");
new File("tmp_testCustomPartitionerGroups").mkdir();
- // SimpleCustomPartitioner partitions as per the parity of the key
- // Need to change this in SimpleCustomPartitioner is changed
Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
- String line = null;
- while((line = reader.readLine()) != null) {
- String[] cols = line.split("\t");
- int value = Integer.parseInt(cols[0]) % 2;
- Assert.assertEquals(0, value);
- }
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
- line = null;
- while((line = reader.readLine()) != null) {
- String[] cols = line.split("\t");
- int value = Integer.parseInt(cols[0]) % 2;
- Assert.assertEquals(1, value);
- }
+ line = null;
+ int count=0;
+ while((line = reader.readLine()) != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ Assert.assertEquals(4, count);
Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+ Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
}
-
+
+ // See PIG-3385
+ @Test
+ public void testCustomPartitionerDistinct() throws Exception{
+ String[] input = {
+ "1\t1",
+ "2\t1",
+ "1\t1",
+ "3\t1",
+ "4\t1",
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+ pigServer.store("B", "tmp_testCustomPartitionerDistinct");
+
+ new File("tmp_testCustomPartitionerDistinct").mkdir();
+
+ // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
+ BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
+ reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
+ line = null;
+ int count=0;
+ while((line = reader.readLine()) != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ Assert.assertEquals(4, count);
+ Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
+ Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
+ Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+ }
+
// See PIG-282
@Test
public void testCustomPartitionerCross() throws Exception{
diff --git a/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java b/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
new file mode 100644
index 0000000..342f162
--- /dev/null
+++ b/test/org/apache/pig/test/utils/SimpleCustomPartitioner3.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class SimpleCustomPartitioner3 extends Partitioner<PigNullableWritable, Writable> {
+ @Override
+ public int getPartition(PigNullableWritable key, Writable value, int numPartitions) {
+ return numPartitions >= 1 ? 1 : 0;
+ }
+}