PIG-4902: Fix UT failures on 0.16 branch: TestTezGraceParallelism, TestPigScriptParser

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1745385 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 77c8299..84299ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -139,6 +139,8 @@
 
 BUG FIXES
 
+PIG-4902: Fix UT failures on 0.16 branch: TestTezGraceParallelism, TestPigScriptParser (daijy)
+
 PIG-4909: PigStorage incompatible with commons-cli-1.3 (knoguchi)
 
 PIG-4908: JythonFunction refers to Oozie launcher script absolute path (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
index 211f4c7..c8afd61 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
@@ -203,7 +203,7 @@
             if (successor != null) {
                 // Map side combiner
                 TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
-                if (!edge.combinePlan.isEmpty()) {
+                if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) {
                     if (successor.isDistinct()) {
                         factor = DEFAULT_DISTINCT_FACTOR;
                     } else {
diff --git a/test/org/apache/pig/test/TestPigScriptParser.java b/test/org/apache/pig/test/TestPigScriptParser.java
index 7824ff1..cf1b6b0 100644
--- a/test/org/apache/pig/test/TestPigScriptParser.java
+++ b/test/org/apache/pig/test/TestPigScriptParser.java
@@ -102,7 +102,7 @@
 
     @Test
     public void testDefineUDF() throws Exception {
-        PigServer ps = new PigServer(ExecType.LOCAL);
+        PigServer ps = new PigServer(Util.getLocalTestMode());
         String inputData[] = {
                 "dshfdskfwww.xyz.com/sportsjoadfjdslpdshfdskfwww.xyz.com/sportsjoadfjdsl" ,
                 "kas;dka;sd" ,
diff --git a/test/org/apache/pig/tez/TestTezGraceParallelism.java b/test/org/apache/pig/tez/TestTezGraceParallelism.java
index b6ecee1..a155277 100644
--- a/test/org/apache/pig/tez/TestTezGraceParallelism.java
+++ b/test/org/apache/pig/tez/TestTezGraceParallelism.java
@@ -117,15 +117,15 @@
         Util.createLogAppender("testDecreaseParallelism", writer, new Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
         try {
             // DAG: 47 \
-            //           -> 49(join) -> 52(distinct) -> 61(group)
+            //           -> 49(join) -> 52(distinct) -> 56(group)
             //      48 /
             // Parallelism at compile time:
             // DAG: 47(1) \
-            //              -> 49(2) -> 52(20) -> 61(200)
+            //              -> 49(2) -> 52(20) -> 56(200)
             //      48(1) /
             // However, when 49 finishes, the actual output of 49 only justify parallelism 1.
-            // We adjust the parallelism for 61 to 100 based on this.
-            // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 100 to 1.
+            // We adjust the parallelism for 56 to 7 based on this.
+            // At runtime, ShuffleVertexManager still kick in and further reduce parallelism from 7 to 1.
             //
             pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE1 + "' as (name:chararray, age:int);");
             pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -140,10 +140,10 @@
                             "('F',1349L)", "('M',1373L)"});
             Util.checkQueryOutputsAfterSort(iter, expectedResults);
             assertTrue(writer.toString().contains("Initialize parallelism for scope-52 to 18"));
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-61 to 7"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-56 to 7"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-49 to 1 from 2"));
             assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-52 to 1 from 18"));
-            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-61 to 1 from 7"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for vertex: scope-56 to 1 from 7"));
         } finally {
             Util.removeLogAppender("testDecreaseParallelism", PigGraceShuffleVertexManager.class, ShuffleVertexManager.class);
         }
@@ -217,8 +217,8 @@
                 count++;
             }
             assertEquals(count, 20);
-            assertTrue(writer.toString().contains("All predecessors for scope-84 are finished, time to set parallelism for scope-85"));
-            assertTrue(writer.toString().contains("Initialize parallelism for scope-85 to 10"));
+            assertTrue(writer.toString().contains("All predecessors for scope-79 are finished, time to set parallelism for scope-80"));
+            assertTrue(writer.toString().contains("Initialize parallelism for scope-80 to 10"));
         } finally {
             Util.removeLogAppender("testJoinWithDifferentDepth", PigGraceShuffleVertexManager.class);
         }
@@ -262,9 +262,9 @@
         StringWriter writer = new StringWriter();
         Util.createLogAppender("testJoinWithUnion", writer, PigGraceShuffleVertexManager.class);
         try {
-            // DAG: 29 -> 32 -> 41 \
-            //                       -> 70 (vertex group) -> 61
-            //      42 -> 45 -> 54 /
+            // DAG: 29 -> 32 -> 36 \
+            //                       -> 55 (vertex group) -> 51
+            //      37 -> 40 -> 44 /
             pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
             pigServer.registerQuery("B = distinct A;");
             pigServer.registerQuery("C = group B by name;");
@@ -280,8 +280,8 @@
                 count++;
             }
             assertEquals(count, 20);
-            assertTrue(writer.toString().contains("time to set parallelism for scope-41"));
-            assertTrue(writer.toString().contains("time to set parallelism for scope-54"));
+            assertTrue(writer.toString().contains("time to set parallelism for scope-36"));
+            assertTrue(writer.toString().contains("time to set parallelism for scope-44"));
         } finally {
             Util.removeLogAppender("testJoinWithUnion", PigGraceShuffleVertexManager.class);
         }