PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test failure
git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.15@1682957 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fa912b7..fc02e4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -74,6 +74,8 @@
BUG FIXES
+PIG-4580: Fix TestTezAutoParallelism.testSkewedJoinIncreaseParallelism test failure (daijy)
+
PIG-4571: TestPigRunner.testGetHadoopCounters fail on Windows (daijy)
PIG-4541: Skewed full outer join does not return records if any relation is empty. Outer join does not
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 9e66959..b743c1e 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -430,7 +430,7 @@
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
- if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
+ if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
// Use custom edge
return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
edge.dataSourceType, edge.schedulingType, out, in);
@@ -671,11 +671,15 @@
// Set the right VertexManagerPlugin
if (tezOp.getEstimatedParallelism() != -1) {
if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
- // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
- // to decrease/increase parallelism of sorting vertex dynamically
- // based on the numQuantiles calculated by sample aggregation vertex
- vmPluginName = PartitionerDefinedVertexManager.class.getName();
- log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
+ if (tezOp.getVertexParallelism()==-1 && (
+ tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
+ tezOp.isSkewedJoin() &&getPlan().getPredecessors(tezOp).size()==2)) {
+ // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
+ // to decrease/increase parallelism of sorting vertex dynamically
+ // based on the numQuantiles calculated by sample aggregation vertex
+ vmPluginName = PartitionerDefinedVertexManager.class.getName();
+ log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
+ }
} else {
boolean containScatterGather = false;
boolean containCustomPartitioner = false;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
index 5eaa84d..a543d62 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
@@ -1639,6 +1639,7 @@
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
List<Boolean> flat = new ArrayList<Boolean>();
+ boolean containsOuter = false;
// Add corresponding POProjects
for (int i=0; i < 2; i++) {
ep = new PhysicalPlan();
@@ -1651,6 +1652,7 @@
if (!inner[i]) {
// Add an empty bag for outer join
CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
+ containsOuter = true;
}
flat.add(true);
}
@@ -1681,14 +1683,16 @@
POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
for (int i = 0; i <= 2; i++) {
- // We need to send sample to left relation partitioner vertex, right relation load vertex,
- // and join vertex (IsFirstReduceOfKey in join vertex need sample file as well)
- joinJobs[i].setSampleOperator(sampleJobPair.first);
-
- // Configure broadcast edges for distribution map
- edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
- TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
- sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+ if (i != 2 || containsOuter) {
+ // We need to send sample to left relation partitioner vertex, right relation load vertex,
+ // and join vertex (IsFirstReduceOfKey in join vertex need sample file as well)
+ joinJobs[i].setSampleOperator(sampleJobPair.first);
+
+ // Configure broadcast edges for distribution map
+ edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
+ TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+ sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+ }
// Configure skewed partitioner for join
if (i != 2) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
index 07b22f1..b878bd0 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
@@ -148,7 +148,12 @@
parallelism = tezOp.getEstimatedParallelism();
}
if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
- if (!overrideRequestedParallelism) {
+ boolean additionalEdge = false;
+ if (tezOp.isGlobalSort() && getPlan().getPredecessors(tezOp).size() != 1 ||
+ tezOp.isSkewedJoin() && getPlan().getPredecessors(tezOp).size() != 2) {
+ additionalEdge = true;
+ }
+ if (!overrideRequestedParallelism && !additionalEdge) {
incrementTotalParallelism(tezOp, parallelism);
// PartitionerDefinedVertexManager will determine parallelism.
// So call setVertexParallelism with -1
@@ -168,6 +173,7 @@
ParallelConstantVisitor visitor =
new ParallelConstantVisitor(partitionerPred.plan, parallelism);
visitor.visit();
+ partitionerPred.setNeedEstimatedQuantile(false);
break;
}
}
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
index d899948..7cb17f9 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-25 -> Tez vertex scope-29,Tez vertex scope-38,Tez vertex scope-48,
-Tez vertex scope-38 -> Tez vertex scope-29,Tez vertex scope-48,Tez vertex scope-52,
+Tez vertex scope-38 -> Tez vertex scope-29,Tez vertex scope-48,
Tez vertex scope-48 -> Tez vertex scope-52,
Tez vertex scope-29 -> Tez vertex scope-52,
Tez vertex scope-52
@@ -55,7 +55,7 @@
|---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-38
# Plan on vertex
-POValueOutputTez - scope-47 -> [scope-29, scope-48, scope-52]
+POValueOutputTez - scope-47 -> [scope-29, scope-48]
|
|---New For Each(false)[tuple] - scope-46
| |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
index c9f84ed..8b4bfc6 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-27 -> Tez vertex scope-36,Tez vertex scope-46,
-Tez vertex scope-36 -> Tez vertex scope-28,Tez vertex scope-46,Tez vertex scope-50,
+Tez vertex scope-36 -> Tez vertex scope-28,Tez vertex scope-46,
Tez vertex scope-46 -> Tez vertex scope-50,
Tez vertex scope-28 -> Tez vertex scope-50,
Tez vertex scope-50
@@ -43,7 +43,7 @@
|---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-36
# Plan on vertex
-POValueOutputTez - scope-45 -> [scope-28, scope-46, scope-50]
+POValueOutputTez - scope-45 -> [scope-28, scope-46]
|
|---New For Each(false)[tuple] - scope-44
| |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
index 86666cd..544c43a 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
@@ -7,7 +7,7 @@
Tez vertex scope-30 -> Tez vertex scope-34,Tez vertex scope-36,
Tez vertex scope-34 -> Tez vertex scope-36,
Tez vertex scope-36 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-40 -> Tez vertex scope-62,
Tez vertex scope-62
@@ -67,7 +67,7 @@
|---POShuffledValueInputTez - scope-37 <- [scope-30, scope-34]
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-40, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
index 4f2dabc..ab9e443 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-40,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-40 -> Tez vertex scope-62,
Tez vertex scope-62
@@ -71,7 +71,7 @@
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-40, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-40, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
index edb072a..fe6e2d3 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-37,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-37,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-31 -> Tez vertex scope-35,Tez vertex scope-37,
Tez vertex scope-35 -> Tez vertex scope-37,
@@ -45,7 +45,7 @@
|---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-37, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-37, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
index 27442e1..fc14096 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
@@ -5,7 +5,7 @@
# TEZ DAG plan: pig-0_scope-0
#--------------------------------------------------
Tez vertex scope-30 -> Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48 -> Tez vertex scope-31,Tez vertex scope-58,Tez vertex scope-62,
+Tez vertex scope-48 -> Tez vertex scope-31,Tez vertex scope-58,
Tez vertex scope-58 -> Tez vertex scope-62,
Tez vertex scope-31 -> Tez vertex scope-62,
Tez vertex scope-62
@@ -43,7 +43,7 @@
|---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-48
# Plan on vertex
-POValueOutputTez - scope-57 -> [scope-31, scope-58, scope-62]
+POValueOutputTez - scope-57 -> [scope-31, scope-58]
|
|---New For Each(false)[tuple] - scope-56
| |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
index cfc4670..e94adb7 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
@@ -7,7 +7,7 @@
Tez vertex scope-114 -> Tez vertex scope-116,
Tez vertex scope-115 -> Tez vertex scope-116,
Tez vertex scope-116 -> Tez vertex scope-128,Tez vertex scope-138,
-Tez vertex scope-128 -> Tez vertex scope-120,Tez vertex scope-138,Tez vertex scope-142,
+Tez vertex scope-128 -> Tez vertex scope-120,Tez vertex scope-138,
Tez vertex scope-138 -> Tez vertex scope-142,
Tez vertex scope-120 -> Tez vertex scope-142,
Tez vertex scope-142
@@ -65,7 +65,7 @@
|---POShuffledValueInputTez - scope-117 <- [scope-114, scope-115]
Tez vertex scope-128
# Plan on vertex
-POValueOutputTez - scope-137 -> [scope-120, scope-138, scope-142]
+POValueOutputTez - scope-137 -> [scope-120, scope-138]
|
|---New For Each(false)[tuple] - scope-136
| |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
index 6c94a1c..0329d0b 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
@@ -7,7 +7,7 @@
Tez vertex scope-29 -> Tez vertex group scope-63,Tez vertex group scope-64,
Tez vertex scope-30 -> Tez vertex group scope-63,Tez vertex group scope-64,
Tez vertex group scope-64 -> Tez vertex scope-43,
-Tez vertex scope-43 -> Tez vertex scope-35,Tez vertex scope-53,Tez vertex scope-57,
+Tez vertex scope-43 -> Tez vertex scope-35,Tez vertex scope-53,
Tez vertex group scope-63 -> Tez vertex scope-53,
Tez vertex scope-53 -> Tez vertex scope-57,
Tez vertex scope-35 -> Tez vertex scope-57,
@@ -79,7 +79,7 @@
# No plan on vertex group
Tez vertex scope-43
# Plan on vertex
-POValueOutputTez - scope-52 -> [scope-35, scope-53, scope-57]
+POValueOutputTez - scope-52 -> [scope-35, scope-53]
|
|---New For Each(false)[tuple] - scope-51
| |
diff --git a/test/org/apache/pig/tez/TestTezAutoParallelism.java b/test/org/apache/pig/tez/TestTezAutoParallelism.java
index 0d689eb..fee4bbd 100644
--- a/test/org/apache/pig/tez/TestTezAutoParallelism.java
+++ b/test/org/apache/pig/tez/TestTezAutoParallelism.java
@@ -246,6 +246,58 @@
}
@Test
+ public void testSkewedFullJoinIncreaseParallelism() throws IOException{
+ // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency,
+ // which prevent it changing parallelism
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by name full, B by name using 'skewed';");
+ pigServer.store("C", "output6");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(files.length, 5);
+ }
+
+ @Test
+ public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{
+ // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency,
+ // which prevent it changing parallelism
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
+ pigServer.registerQuery("D = load 'org.apache.pig.tez.TestTezAutoParallelism_1' as (name:chararray, age:int);");
+ pigServer.registerQuery("E = group D all;");
+ pigServer.registerQuery("F = foreach E generate COUNT(D) as count;");
+ pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
+ pigServer.store("G", "output7");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ assertEquals(files.length, 4);
+ }
+
+ @Test
public void testIncreaseIntermediateParallelism1() throws IOException{
// User specified parallelism is overriden for intermediate step
String outputDir = "/tmp/testIncreaseIntermediateParallelism";