PIG-5041: RoundRobinPartitioner is not deterministic when order of input records change (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1765312 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f00fc2..234a8c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,6 +48,8 @@
BUG FIXES
+PIG-5041: RoundRobinPartitioner is not deterministic when order of input records change (rohini)
+
PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini
PIG-5038: Pig Limit_2 e2e test failed with sort check (Konstantin_Harasov via rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
index fa29364..7e444d0 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
@@ -45,6 +45,7 @@
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -53,7 +54,6 @@
import org.apache.pig.builtin.JsonStorage;
import org.apache.pig.builtin.OrcStorage;
import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -590,7 +590,7 @@
// more union predecessors. Change it to SCATTER_GATHER
if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
edge.dataMovementType = DataMovementType.SCATTER_GATHER;
- edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.partitionerClass = HashValuePartitioner.class;
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
new file mode 100644
index 0000000..ded3e86
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+
+public class HashValuePartitioner extends Partitioner<Writable, Writable> {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public int getPartition(Writable key, Writable value, int numPartitions) {
+ int hash = 17;
+ Tuple tuple;
+ if (value instanceof Tuple) {
+ // union optimizer turned off
+ tuple = (Tuple) value;
+ } else {
+ // union followed by order by or skewed join
+ tuple = (Tuple)((NullableTuple) value).getValueAsPigType();
+ }
+ if (tuple != null) {
+ for (Object o : tuple.getAll()) {
+ if (o != null) {
+ // Skip computing hashcode for bags.
+ // Order of elements in the map/bag may be different on each run
+ if (o instanceof DataBag) {
+ hash = 31 * hash;
+ } else if (o instanceof Map) {
+ // Including size of map as it is easily available
+ // Not doing for DataBag as some implementations actually
+ // iterate through all elements in the bag to get the size.
+ hash = 31 * hash + ((Map) o).size();
+ } else {
+ hash = 31 * hash + o.hashCode();
+ }
+ }
+ }
+ }
+ return (hash & Integer.MAX_VALUE) % numPartitions;
+ }
+
+}
\ No newline at end of file
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
index a37eace..14c416c 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
@@ -40,9 +40,9 @@
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.builtin.TOBAG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
@@ -269,7 +269,7 @@
} else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
edge.inputClassName = UnorderedKVInput.class.getName();
- edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.partitionerClass = HashValuePartitioner.class;
}
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
diff --git a/src/org/apache/pig/builtin/RoundRobinPartitioner.java b/src/org/apache/pig/builtin/RoundRobinPartitioner.java
index b620554..775d7ea 100644
--- a/src/org/apache/pig/builtin/RoundRobinPartitioner.java
+++ b/src/org/apache/pig/builtin/RoundRobinPartitioner.java
@@ -22,6 +22,17 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Partitioner;
+/**
+ * This partitioner should be used with extreme caution and only in cases
+ * where the order of output records is guaranteed to be same. If the order of
+ * output records can vary on retries which is mostly the case, map reruns
+ * due to shuffle fetch failures can lead to data being partitioned differently
+ * and result in incorrect output due to loss or duplication of data.
+ * Refer PIG-5041 for more details.
+ *
+ * This will be removed in the next release as it is risky to use in most cases.
+ */
+@Deprecated
public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
implements Configurable {
diff --git a/test/org/apache/pig/tez/TestTezJobExecution.java b/test/org/apache/pig/tez/TestTezJobExecution.java
index 97ede40..ee89306 100644
--- a/test/org/apache/pig/tez/TestTezJobExecution.java
+++ b/test/org/apache/pig/tez/TestTezJobExecution.java
@@ -30,7 +30,6 @@
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigRunner;
import org.apache.pig.PigServer;
-import org.apache.pig.builtin.RoundRobinPartitioner;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.test.Util;
import org.apache.pig.tools.pigstats.JobStats;
@@ -72,19 +71,17 @@
}
@Test
- public void testUnionParallelRoundRobinBatchSize() throws IOException {
+ public void testUnionParallelHashValuePartition() throws IOException {
String output = TEST_DIR + Path.SEPARATOR + "output1";
String query = "A = LOAD '" + INPUT_FILE + "';"
+ "B = LOAD '" + INPUT_FILE + "';"
+ "C = UNION A, B PARALLEL 2;"
+ "STORE C into '" + output + "';";
- pigServer.getPigContext().getProperties().setProperty(
- RoundRobinPartitioner.PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, "3");
pigServer.registerQuery(query);
String part0 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00000"));
String part1 = FileUtils.readFileToString(new File(output + Path.SEPARATOR + "part-v002-o000-r-00001"));
- assertEquals("1\n1\n1\n1\n1\n1\n", part0);
- assertEquals("2\n2\n2\n2\n2\n2\n", part1);
+ assertEquals("2\n2\n2\n2\n2\n2\n", part0);
+ assertEquals("1\n1\n1\n1\n1\n1\n", part1);
}
@Test
@@ -108,7 +105,7 @@
// Recovery is not disabled when there is auto parallelism. Should reuse AM application session
PigStats stats = PigRunner.run(args, listener);
assertTrue(stats.isSuccessful());
- assertEquals(listener.getJobsStarted().size(), 1);
+ assertEquals(1, listener.getJobsStarted().size());
Util.deleteFile(pigServer.getPigContext(), output1);
Util.deleteFile(pigServer.getPigContext(), output2);
@@ -122,7 +119,7 @@
scriptFile };
stats = PigRunner.run(args, listener);
assertTrue(stats.isSuccessful());
- assertEquals(listener.getJobsStarted().size(), 2);
+ assertEquals(2, listener.getJobsStarted().size());
}