PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test failure

git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.15@1682957 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fa912b7..fc02e4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -74,6 +74,8 @@
  
 BUG FIXES
 
+PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test failure (daijy)
+
 PIG-4571: TestPigRunner.testGetHadoopCounters fail on Windows (daijy)
 
 PIG-4541: Skewed full outer join does not return records if any relation is empty. Outer join does not
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 9e66959..b743c1e 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -430,7 +430,7 @@
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
-        if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
+        if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
             return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
                     edge.dataSourceType, edge.schedulingType, out, in);
@@ -671,11 +671,15 @@
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
-                // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
-                // to decrease/increase parallelism of sorting vertex dynamically
-                // based on the numQuantiles calculated by sample aggregation vertex
-                vmPluginName = PartitionerDefinedVertexManager.class.getName();
-                log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
+                if (tezOp.getVertexParallelism()==-1 && (
+                        tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
+                        tezOp.isSkewedJoin() &&getPlan().getPredecessors(tezOp).size()==2)) {
+                    // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
+                    // to decrease/increase parallelism of sorting vertex dynamically
+                    // based on the numQuantiles calculated by sample aggregation vertex
+                    vmPluginName = PartitionerDefinedVertexManager.class.getName();
+                    log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
+                }
             } else {
                 boolean containScatterGather = false;
                 boolean containCustomPartitioner = false;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
index 5eaa84d..a543d62 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
@@ -1639,6 +1639,7 @@
             List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
             List<Boolean> flat = new ArrayList<Boolean>();
 
+            boolean containsOuter = false;
             // Add corresponding POProjects
             for (int i=0; i < 2; i++) {
                 ep = new PhysicalPlan();
@@ -1651,6 +1652,7 @@
                 if (!inner[i]) {
                     // Add an empty bag for outer join
                     CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
+                    containsOuter = true;
                 }
                 flat.add(true);
             }
@@ -1681,14 +1683,16 @@
 
             POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
             for (int i = 0; i <= 2; i++) {
-                // We need to send sample to left relation partitioner vertex, right relation load vertex,
-                // and join vertex (IsFirstReduceOfKey in join vertex need sample file as well)
-                joinJobs[i].setSampleOperator(sampleJobPair.first);
-
-                // Configure broadcast edges for distribution map
-                edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
-                TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
-                sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+                if (i != 2 || containsOuter) {
+                    // We need to send sample to left relation partitioner vertex, right relation load vertex,
+                    // and join vertex (IsFirstReduceOfKey in join vertex need sample file as well)
+                    joinJobs[i].setSampleOperator(sampleJobPair.first);
+    
+                    // Configure broadcast edges for distribution map
+                    edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
+                    TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+                    sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+                }
 
                 // Configure skewed partitioner for join
                 if (i != 2) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
index 07b22f1..b878bd0 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
@@ -148,7 +148,12 @@
                             parallelism = tezOp.getEstimatedParallelism();
                         }
                         if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
-                            if (!overrideRequestedParallelism) {
+                            boolean additionalEdge = false;
+                            if (tezOp.isGlobalSort() && getPlan().getPredecessors(tezOp).size() != 1 ||
+                                    tezOp.isSkewedJoin() && getPlan().getPredecessors(tezOp).size() != 2) {
+                                additionalEdge = true;
+                            }
+                            if (!overrideRequestedParallelism && !additionalEdge) {
                                 incrementTotalParallelism(tezOp, parallelism);
                                 // PartitionerDefinedVertexManager will determine parallelism.
                                 // So call setVertexParallelism with -1
@@ -168,6 +173,7 @@
                                                 ParallelConstantVisitor visitor =
                                                         new ParallelConstantVisitor(partitionerPred.plan, parallelism);
                                                 visitor.visit();
+                                                partitionerPred.setNeedEstimatedQuantile(false);
                                                 break;
                                             }
                                         }
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
index d899948..7cb17f9 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-25	->	Tez vertex scope-29,Tez vertex scope-38,Tez vertex scope-48,
-Tez vertex scope-38	->	Tez vertex scope-29,Tez vertex scope-48,Tez vertex scope-52,
+Tez vertex scope-38	->	Tez vertex scope-29,Tez vertex scope-48,
 Tez vertex scope-48	->	Tez vertex scope-52,
 Tez vertex scope-29	->	Tez vertex scope-52,
 Tez vertex scope-52
@@ -55,7 +55,7 @@
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-38
 # Plan on vertex
-POValueOutputTez - scope-47	->	 [scope-29, scope-48, scope-52]
+POValueOutputTez - scope-47	->	 [scope-29, scope-48]
 |
 |---New For Each(false)[tuple] - scope-46
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
index c9f84ed..8b4bfc6 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-27	->	Tez vertex scope-36,Tez vertex scope-46,
-Tez vertex scope-36	->	Tez vertex scope-28,Tez vertex scope-46,Tez vertex scope-50,
+Tez vertex scope-36	->	Tez vertex scope-28,Tez vertex scope-46,
 Tez vertex scope-46	->	Tez vertex scope-50,
 Tez vertex scope-28	->	Tez vertex scope-50,
 Tez vertex scope-50
@@ -43,7 +43,7 @@
                 |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-36
 # Plan on vertex
-POValueOutputTez - scope-45	->	 [scope-28, scope-46, scope-50]
+POValueOutputTez - scope-45	->	 [scope-28, scope-46]
 |
 |---New For Each(false)[tuple] - scope-44
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
index 86666cd..544c43a 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
@@ -7,7 +7,7 @@
 Tez vertex scope-30	->	Tez vertex scope-34,Tez vertex scope-36,
 Tez vertex scope-34	->	Tez vertex scope-36,
 Tez vertex scope-36	->	Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48	->	Tez vertex scope-40,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48	->	Tez vertex scope-40,Tez vertex scope-58,
 Tez vertex scope-58	->	Tez vertex scope-62,
 Tez vertex scope-40	->	Tez vertex scope-62,
 Tez vertex scope-62
@@ -67,7 +67,7 @@
             |---POShuffledValueInputTez - scope-37	<-	 [scope-30, scope-34]
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57	->	 [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57	->	 [scope-40, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
index 4f2dabc..ab9e443 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-30	->	Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48	->	Tez vertex scope-40,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48	->	Tez vertex scope-40,Tez vertex scope-58,
 Tez vertex scope-58	->	Tez vertex scope-62,
 Tez vertex scope-40	->	Tez vertex scope-62,
 Tez vertex scope-62
@@ -71,7 +71,7 @@
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57	->	 [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57	->	 [scope-40, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
index edb072a..fe6e2d3 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-30	->	Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48	->	Tez vertex scope-37,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48	->	Tez vertex scope-37,Tez vertex scope-58,
 Tez vertex scope-58	->	Tez vertex scope-62,
 Tez vertex scope-31	->	Tez vertex scope-35,Tez vertex scope-37,
 Tez vertex scope-35	->	Tez vertex scope-37,
@@ -45,7 +45,7 @@
                 |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57	->	 [scope-37, scope-58, scope-62]
+POValueOutputTez - scope-57	->	 [scope-37, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
index 27442e1..fc14096 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
@@ -5,7 +5,7 @@
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
 Tez vertex scope-30	->	Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48	->	Tez vertex scope-31,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48	->	Tez vertex scope-31,Tez vertex scope-58,
 Tez vertex scope-58	->	Tez vertex scope-62,
 Tez vertex scope-31	->	Tez vertex scope-62,
 Tez vertex scope-62
@@ -43,7 +43,7 @@
                 |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-48
 # Plan on vertex
-POValueOutputTez - scope-57	->	 [scope-31, scope-58, scope-62]
+POValueOutputTez - scope-57	->	 [scope-31, scope-58]
 |
 |---New For Each(false)[tuple] - scope-56
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
index cfc4670..e94adb7 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
@@ -7,7 +7,7 @@
 Tez vertex scope-114	->	Tez vertex scope-116,
 Tez vertex scope-115	->	Tez vertex scope-116,
 Tez vertex scope-116	->	Tez vertex scope-128,Tez vertex scope-138,
-Tez vertex scope-128	->	Tez vertex scope-120,Tez vertex scope-138,Tez vertex scope-142,
+Tez vertex scope-128	->	Tez vertex scope-120,Tez vertex scope-138,
 Tez vertex scope-138	->	Tez vertex scope-142,
 Tez vertex scope-120	->	Tez vertex scope-142,
 Tez vertex scope-142
@@ -65,7 +65,7 @@
             |---POShuffledValueInputTez - scope-117	<-	 [scope-114, scope-115]
 Tez vertex scope-128
 # Plan on vertex
-POValueOutputTez - scope-137	->	 [scope-120, scope-138, scope-142]
+POValueOutputTez - scope-137	->	 [scope-120, scope-138]
 |
 |---New For Each(false)[tuple] - scope-136
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
index 6c94a1c..0329d0b 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
@@ -7,7 +7,7 @@
 Tez vertex scope-29	->	Tez vertex group scope-63,Tez vertex group scope-64,
 Tez vertex scope-30	->	Tez vertex group scope-63,Tez vertex group scope-64,
 Tez vertex group scope-64	->	Tez vertex scope-43,
-Tez vertex scope-43	->	Tez vertex scope-35,Tez vertex scope-53,Tez vertex scope-57,
+Tez vertex scope-43	->	Tez vertex scope-35,Tez vertex scope-53,
 Tez vertex group scope-63	->	Tez vertex scope-53,
 Tez vertex scope-53	->	Tez vertex scope-57,
 Tez vertex scope-35	->	Tez vertex scope-57,
@@ -79,7 +79,7 @@
 # No plan on vertex group
 Tez vertex scope-43
 # Plan on vertex
-POValueOutputTez - scope-52	->	 [scope-35, scope-53, scope-57]
+POValueOutputTez - scope-52	->	 [scope-35, scope-53]
 |
 |---New For Each(false)[tuple] - scope-51
     |   |
diff --git a/test/org/apache/pig/tez/TestTezAutoParallelism.java b/test/org/apache/pig/tez/TestTezAutoParallelism.java
index 0d689eb..fee4bbd 100644
--- a/test/org/apache/pig/tez/TestTezAutoParallelism.java
+++ b/test/org/apache/pig/tez/TestTezAutoParallelism.java
@@ -246,6 +246,58 @@
     }
 
     @Test
+    public void testSkewedFullJoinIncreaseParallelism() throws IOException{
+        // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency,
+        // which prevent it changing parallelism
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+        pigServer.registerQuery("C = join A by name full, B by name using 'skewed';");
+        pigServer.store("C", "output6");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        assertEquals(files.length, 5);
+    }
+
+    @Test
+    public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{
+        // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency,
+        // which prevent it changing parallelism
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+        pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
+        pigServer.registerQuery("D = load 'org.apache.pig.tez.TestTezAutoParallelism_1' as (name:chararray, age:int);");
+        pigServer.registerQuery("E = group D all;");
+        pigServer.registerQuery("F = foreach E generate COUNT(D) as count;");
+        pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
+        pigServer.store("G", "output7");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith("part")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        assertEquals(files.length, 4);
+    }
+
+    @Test
     public void testIncreaseIntermediateParallelism1() throws IOException{
         // User specified parallelism is overriden for intermediate step
         String outputDir = "/tmp/testIncreaseIntermediateParallelism";