PIG-4902: Fix UT failures on 0.16 branch: TestTezGraceParallelism, TestPigScriptParser
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1745385 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 77c8299..84299ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -139,6 +139,8 @@
BUG FIXES
+PIG-4902: Fix UT failures on 0.16 branch: TestTezGraceParallelism, TestPigScriptParser (daijy)
+
PIG-4909: PigStorage incompatible with commons-cli-1.3 (knoguchi)
PIG-4908: JythonFunction refers to Oozie launcher script absolute path (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
index 211f4c7..c8afd61 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
@@ -203,7 +203,7 @@
if (successor != null) {
// Map side combiner
TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
- if (!edge.combinePlan.isEmpty()) {
+ if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) {
if (successor.isDistinct()) {
factor = DEFAULT_DISTINCT_FACTOR;
} else {
diff --git a/test/org/apache/pig/test/TestPigScriptParser.java b/test/org/apache/pig/test/TestPigScriptParser.java
index 7824ff1..cf1b6b0 100644
--- a/test/org/apache/pig/test/TestPigScriptParser.java
+++ b/test/org/apache/pig/test/TestPigScriptParser.java
@@ -102,7 +102,7 @@
@Test
public void testDefineUDF() throws Exception {
- PigServer ps = new PigServer(ExecType.LOCAL);
+ PigServer ps = new PigServer(Util.getLocalTestMode());
String inputData[] = {
"dshfdskfwww.xyz.com/sportsjoadfjdslpdshfdskfwww.xyz.com/sportsjoadfjdsl" ,
"kas;dka;sd" ,
diff --git a/test/org/apache/pig/tez/TestTezGraceParallelism.java b/test/org/apache/pig/tez/TestTezGraceParallelism.java
index b6ecee1..a155277 100644
--- a/test/org/apache/pig/tez/TestTezGraceParallelism.java
+++ b/test/org/apache/pig/tez/TestTezGraceParallelism.java
@@ -117,15 +117,15 @@
Util.createLogAppender("testDecreaseParallelism", writer, new Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
try {
// DAG: 47 \
- // -> 49(join) -> 52(distinct) -> 61(group)
+ // -> 49(join) -> 52(distinct) -> 56(group)
// 48 /
// Parallelism at compile time:
// DAG: 47(1) \
- // -> 49(2) -> 52(20) -> 61(200)
+ // -> 49(2) -> 52(20) -> 56(200)
// 48(1) /
// However, when 49 finishes, the actual output of 49 only justify parallelism 1.
- // We adjust the parallelism for 61 to 100 based on this.
- // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 100 to 1.
+ // We adjust the parallelism for 56 to 7 based on this.
+ // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 7 to 1.
//
pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -140,10 +140,10 @@
"('F',1349L)", "('M',1373L)"});
Util.checkQueryOutputsAfterSort(iter, expectedResults);
assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18"));
- assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-56 to 7"));
assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2"));
assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18"));
- assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7"));
+ assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-56 to 1 from 7"));
} finally {
Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
}
@@ -217,8 +217,8 @@
count++;
}
assertEquals(count, 20);
- assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85"));
- assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10"));
+ assertTrue(writer.toString().contains("All predecessors for scope-79 are finished, time to set parallelism for scope-80"));
+ assertTrue(writer.toString().contains("Initialize parallelism for scope-80 to 10"));
} finally {
Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class);
}
@@ -262,9 +262,9 @@
StringWriter writer = new StringWriter();
Util.createLogAppender("testJoinWithUnion", writer, PigGraceShuffleVertexManager.class);
try {
- // DAG: 29 -> 32 -> 41 \
- // -> 70 (vertex group) -> 61
- // 42 -> 45 -> 54 /
+ // DAG: 29 -> 32 -> 36 \
+ // -> 55 (vertex group) -> 51
+ // 37 -> 40 -> 44 /
pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("B = distinct A;");
pigServer.registerQuery("C = group B by name;");
@@ -280,8 +280,8 @@
count++;
}
assertEquals(count, 20);
- assertTrue(writer.toString().contains("time to set parallelism for scope-41"));
- assertTrue(writer.toString().contains("time to set parallelism for scope-54"));
+ assertTrue(writer.toString().contains("time to set parallelism for scope-36"));
+ assertTrue(writer.toString().contains("time to set parallelism for scope-44"));
} finally {
Util.removeLogAppender("testJoinWithUnion", PigGraceShuffleVertexManager.class);
}