PIG-5375: NullPointerException for multi-level self unions with Tez UnionOptimizer (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1862504 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index dfe5c3e..2922cdc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -92,9 +92,11 @@
  
 BUG FIXES
 
-PIG-5386: Pig local mode with bundled Hadoop broken
+PIG-5375: NullPointerException for multi-level self unions with Tez UnionOptimizer (knoguchi)
 
-PIG-5387: Test failures on JRE 11
+PIG-5386: Pig local mode with bundled Hadoop broken (nkollar)
+
+PIG-5387: Test failures on JRE 11 (nkollar)
 
 PIG-5383: OrcStorage fails when "bytearray" represents unknown type (knoguchi)
 
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
index 55cfb5a..6ea5a8b 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
@@ -289,18 +289,16 @@
                 if (storeVertexGroupOps[i] != null) {
                     continue;
                 }
-            }
-            if (existingVertexGroup != null) {
-                storeVertexGroupOps[i] = existingVertexGroup;
-                existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
-                existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
-                existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
-            } else {
                 storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
                 storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
                 storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
                 storeVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
                 tezPlan.add(storeVertexGroupOps[i]);
+            } else {
+                storeVertexGroupOps[i] = existingVertexGroup;
+                existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+                existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
+                existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
             }
         }
 
@@ -320,19 +318,36 @@
         TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
         String[] newOutputKeys = new String[unionOutputKeys.size()];
         for (int i=0; i < outputVertexGroupOps.length; i++) {
-            for (int j = 0; j < i; j++) {
-                if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
-                    outputVertexGroupOps[i] = outputVertexGroupOps[j];
-                    break;
+            TezOperator existingVertexGroup = null;
+            if (successors != null) {
+                for (TezOperator succ : successors) {
+                    if (succ.isVertexGroup()
+                        && unionOutputKeys.get(i).equals(succ.getVertexGroupInfo().getOutput()) ) {
+                        existingVertexGroup = succ;
+                        break;
+                    }
                 }
             }
-            if (outputVertexGroupOps[i] != null) {
-                continue;
+            if (existingVertexGroup == null) {
+                for (int j = 0; j < i; j++) {
+                    if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
+                        outputVertexGroupOps[i] = outputVertexGroupOps[j];
+                        break;
+                    }
+                }
+                if (outputVertexGroupOps[i] != null) {
+                    continue;
+                }
+                outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
+                outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
+                outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
+                outputVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
+            } else {
+                outputVertexGroupOps[i] = existingVertexGroup;
+                existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+                existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
+                existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
             }
-            outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
-            outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
-            outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
-            outputVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
             newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
             tezPlan.add(outputVertexGroupOps[i]);
         }
@@ -619,18 +634,6 @@
         // Connect to outputVertexGroupOps
         for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
             TezOperator succOp = tezPlan.getOperator(entry.getKey());
-            // Case of union followed by union.
-            // unionOp.outEdges will not point to vertex group, but to its output.
-            // So find the vertex group if there is one.
-            TezOperator succOpVertexGroup = null;
-            for (TezOperator succ : successors) {
-                if (succ.isVertexGroup()
-                        && succOp.getOperatorKey().toString()
-                                .equals(succ.getVertexGroupInfo().getOutput())) {
-                    succOpVertexGroup = succ;
-                    break;
-                }
-            }
             TezEdgeDescriptor edge = entry.getValue();
             // Edge cannot be one to one as it will get input from two or
             // more union predecessors. Change it to SCATTER_GATHER
@@ -641,26 +644,14 @@
                 edge.inputClassName = UnorderedKVInput.class.getName();
             }
             TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
-            for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+            for (OperatorKey predKey : unionOp.getUnionMembers()) {
                 TezOperator pred = tezPlan.getOperator(predKey);
                 // Keep the output edge directly to successor
                 // Don't need to keep output edge for vertexgroup
                 pred.outEdges.put(entry.getKey(), edge);
                 succOp.inEdges.put(predKey, edge);
-                if (succOpVertexGroup != null) {
-                    succOpVertexGroup.getVertexGroupMembers().add(predKey);
-                    succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
-                    // Connect directly to the successor vertex group
-                    tezPlan.disconnect(pred, vertexGroupOp);
-                    tezPlan.connect(pred, succOpVertexGroup);
-                }
             }
-            if (succOpVertexGroup != null) {
-                succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
-                succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
-                //Discard the new vertex group created
-                tezPlan.remove(vertexGroupOp);
-            } else {
+            if(!tezPlan.pathExists(vertexGroupOp, succOp)) {
                 tezPlan.connect(vertexGroupOp, succOp);
             }
         }
diff --git a/test/e2e/pig/tests/multiquery.conf b/test/e2e/pig/tests/multiquery.conf
index ac927ef..b470d37 100644
--- a/test/e2e/pig/tests/multiquery.conf
+++ b/test/e2e/pig/tests/multiquery.conf
@@ -773,6 +773,19 @@
 SPLIT u3 INTO t if age > 75, u OTHERWISE;

 v = JOIN t BY name LEFT, c BY votername;

 store v into ':OUTPATH:';\,

+            },

+            {

+            # PIG-5375. multi-level Unions with splits

+            'num' => 15,

+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);

+b= load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);

+c= load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);

+a_and_b = UNION ONSCHEMA a, b;

+SPLIT a_and_b INTO a_and_b2 IF age < 30, a_and_b3 OTHERWISE;

+a_and_b_and_c = UNION ONSCHEMA c, a_and_b;

+v = UNION ONSCHEMA a_and_b_and_c, a_and_b2, a_and_b3;

+v2 = GROUP v by *;

+store v2 into ':OUTPATH:';\,

             }

             ] # end of tests

         },

diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
index 4e1035b..db95b77 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
@@ -34,13 +34,13 @@
     |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17
 Tez vertex scope-37
 # Plan on vertex
-e: Split - scope-66
+e: Split - scope-65
 |   |
-|   e: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-67	->	 scope-29
+|   e: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-66	->	 scope-29
 |   |
-|   f: Local Rearrange[tuple]{int}(false) - scope-68	->	 scope-53
+|   f: Local Rearrange[tuple]{int}(false) - scope-67	->	 scope-53
 |   |   |
-|   |   Project[int][0] - scope-69
+|   |   Project[int][0] - scope-68
 |
 |---a: New For Each(false,false)[bag] - scope-7
     |   |
@@ -55,13 +55,13 @@
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-38
 # Plan on vertex
-e: Split - scope-70
+e: Split - scope-69
 |   |
-|   e: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-71	->	 scope-29
+|   e: Store(file:///tmp/pigoutput1:org.apache.pig.builtin.PigStorage) - scope-70	->	 scope-29
 |   |
-|   f: Local Rearrange[tuple]{int}(false) - scope-72	->	 scope-53
+|   f: Local Rearrange[tuple]{int}(false) - scope-71	->	 scope-53
 |   |   |
-|   |   Project[int][0] - scope-73
+|   |   Project[int][0] - scope-72
 |
 |---c: New For Each(false,false)[bag] - scope-15
     |   |
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld
index b4645b6..27bced2 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld
@@ -27,77 +27,77 @@
     |---c: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-18
 Tez vertex scope-74
 # Plan on vertex
-1-3: Split - scope-166
+1-3: Split - scope-165
 |   |
-|   d: Local Rearrange[tuple]{chararray}(false) - scope-171	->	 scope-84
+|   d: Local Rearrange[tuple]{chararray}(false) - scope-170	->	 scope-84
 |   |   |
-|   |   Project[chararray][0] - scope-172
+|   |   Project[chararray][0] - scope-171
 |   |
-|   |---r: Filter[bag] - scope-167
+|   |---r: Filter[bag] - scope-166
 |       |   |
-|       |   Not Equal To[boolean] - scope-170
+|       |   Not Equal To[boolean] - scope-169
 |       |   |
-|       |   |---Project[chararray][0] - scope-168
+|       |   |---Project[chararray][0] - scope-167
 |       |   |
-|       |   |---Constant() - scope-169
+|       |   |---Constant() - scope-168
 |   |
-|   u2: Split - scope-181
+|   u2: Split - scope-180
 |   |   |
-|   |   POValueOutputTez - scope-191	->	 [scope-104]
+|   |   POValueOutputTez - scope-190	->	 [scope-104]
 |   |   |
-|   |   |---v: Limit - scope-190
+|   |   |---v: Limit - scope-189
 |   |       |
-|   |       |---t: Filter[bag] - scope-186
+|   |       |---t: Filter[bag] - scope-185
 |   |           |   |
-|   |           |   Not Equal To[boolean] - scope-189
+|   |           |   Not Equal To[boolean] - scope-188
 |   |           |   |
-|   |           |   |---Project[chararray][0] - scope-187
+|   |           |   |---Project[chararray][0] - scope-186
 |   |           |   |
-|   |           |   |---Constant() - scope-188
+|   |           |   |---Constant() - scope-187
 |   |           |
-|   |           |---e: Filter[bag] - scope-182
+|   |           |---e: Filter[bag] - scope-181
 |   |               |   |
-|   |               |   Equal To[boolean] - scope-185
+|   |               |   Equal To[boolean] - scope-184
 |   |               |   |
-|   |               |   |---Project[chararray][0] - scope-183
+|   |               |   |---Project[chararray][0] - scope-182
 |   |               |   |
-|   |               |   |---Constant() - scope-184
+|   |               |   |---Constant() - scope-183
 |   |   |
-|   |   POValueOutputTez - scope-201	->	 [scope-104]
+|   |   POValueOutputTez - scope-200	->	 [scope-104]
 |   |   |
-|   |   |---v: Limit - scope-200
+|   |   |---v: Limit - scope-199
 |   |       |
-|   |       |---t: Filter[bag] - scope-196
+|   |       |---t: Filter[bag] - scope-195
 |   |           |   |
-|   |           |   Not Equal To[boolean] - scope-199
+|   |           |   Not Equal To[boolean] - scope-198
 |   |           |   |
-|   |           |   |---Project[chararray][0] - scope-197
+|   |           |   |---Project[chararray][0] - scope-196
 |   |           |   |
-|   |           |   |---Constant() - scope-198
+|   |           |   |---Constant() - scope-197
 |   |           |
-|   |           |---f: Filter[bag] - scope-192
+|   |           |---f: Filter[bag] - scope-191
 |   |               |   |
-|   |               |   Equal To[boolean] - scope-195
+|   |               |   Equal To[boolean] - scope-194
 |   |               |   |
-|   |               |   |---Project[chararray][0] - scope-193
+|   |               |   |---Project[chararray][0] - scope-192
 |   |               |   |
-|   |               |   |---Constant(m) - scope-194
+|   |               |   |---Constant(m) - scope-193
 |   |
-|   |---u2: New For Each(false,false)[bag] - scope-180
+|   |---u2: New For Each(false,false)[bag] - scope-179
 |       |   |
-|       |   Project[chararray][0] - scope-178
+|       |   Project[chararray][0] - scope-177
 |       |   |
-|       |   Constant(DummyVal) - scope-179
+|       |   Constant(DummyVal) - scope-178
 |       |
-|       |---s: Filter[bag] - scope-173
+|       |---s: Filter[bag] - scope-172
 |           |   |
-|           |   Not[boolean] - scope-177
+|           |   Not[boolean] - scope-176
 |           |   |
-|           |   |---Not Equal To[boolean] - scope-176
+|           |   |---Not Equal To[boolean] - scope-175
 |           |       |
-|           |       |---Project[chararray][0] - scope-174
+|           |       |---Project[chararray][0] - scope-173
 |           |       |
-|           |       |---Constant() - scope-175
+|           |       |---Constant() - scope-174
 |
 |---a: New For Each(false)[bag] - scope-6
     |   |
@@ -108,77 +108,77 @@
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-2
 Tez vertex scope-75
 # Plan on vertex
-1-3: Split - scope-202
+1-3: Split - scope-201
 |   |
-|   d: Local Rearrange[tuple]{chararray}(false) - scope-207	->	 scope-84
+|   d: Local Rearrange[tuple]{chararray}(false) - scope-206	->	 scope-84
 |   |   |
-|   |   Project[chararray][0] - scope-208
+|   |   Project[chararray][0] - scope-207
 |   |
-|   |---r: Filter[bag] - scope-203
+|   |---r: Filter[bag] - scope-202
 |       |   |
-|       |   Not Equal To[boolean] - scope-206
+|       |   Not Equal To[boolean] - scope-205
 |       |   |
-|       |   |---Project[chararray][0] - scope-204
+|       |   |---Project[chararray][0] - scope-203
 |       |   |
-|       |   |---Constant() - scope-205
+|       |   |---Constant() - scope-204
 |   |
-|   u2: Split - scope-217
+|   u2: Split - scope-216
 |   |   |
-|   |   POValueOutputTez - scope-227	->	 [scope-104]
+|   |   POValueOutputTez - scope-226	->	 [scope-104]
 |   |   |
-|   |   |---v: Limit - scope-226
+|   |   |---v: Limit - scope-225
 |   |       |
-|   |       |---t: Filter[bag] - scope-222
+|   |       |---t: Filter[bag] - scope-221
 |   |           |   |
-|   |           |   Not Equal To[boolean] - scope-225
+|   |           |   Not Equal To[boolean] - scope-224
 |   |           |   |
-|   |           |   |---Project[chararray][0] - scope-223
+|   |           |   |---Project[chararray][0] - scope-222
 |   |           |   |
-|   |           |   |---Constant() - scope-224
+|   |           |   |---Constant() - scope-223
 |   |           |
-|   |           |---e: Filter[bag] - scope-218
+|   |           |---e: Filter[bag] - scope-217
 |   |               |   |
-|   |               |   Equal To[boolean] - scope-221
+|   |               |   Equal To[boolean] - scope-220
 |   |               |   |
-|   |               |   |---Project[chararray][0] - scope-219
+|   |               |   |---Project[chararray][0] - scope-218
 |   |               |   |
-|   |               |   |---Constant() - scope-220
+|   |               |   |---Constant() - scope-219
 |   |   |
-|   |   POValueOutputTez - scope-237	->	 [scope-104]
+|   |   POValueOutputTez - scope-236	->	 [scope-104]
 |   |   |
-|   |   |---v: Limit - scope-236
+|   |   |---v: Limit - scope-235
 |   |       |
-|   |       |---t: Filter[bag] - scope-232
+|   |       |---t: Filter[bag] - scope-231
 |   |           |   |
-|   |           |   Not Equal To[boolean] - scope-235
+|   |           |   Not Equal To[boolean] - scope-234
 |   |           |   |
-|   |           |   |---Project[chararray][0] - scope-233
+|   |           |   |---Project[chararray][0] - scope-232
 |   |           |   |
-|   |           |   |---Constant() - scope-234
+|   |           |   |---Constant() - scope-233
 |   |           |
-|   |           |---f: Filter[bag] - scope-228
+|   |           |---f: Filter[bag] - scope-227
 |   |               |   |
-|   |               |   Equal To[boolean] - scope-231
+|   |               |   Equal To[boolean] - scope-230
 |   |               |   |
-|   |               |   |---Project[chararray][0] - scope-229
+|   |               |   |---Project[chararray][0] - scope-228
 |   |               |   |
-|   |               |   |---Constant(m) - scope-230
+|   |               |   |---Constant(m) - scope-229
 |   |
-|   |---u2: New For Each(false,false)[bag] - scope-216
+|   |---u2: New For Each(false,false)[bag] - scope-215
 |       |   |
-|       |   Project[chararray][0] - scope-214
+|       |   Project[chararray][0] - scope-213
 |       |   |
-|       |   Constant(DummyVal) - scope-215
+|       |   Constant(DummyVal) - scope-214
 |       |
-|       |---s: Filter[bag] - scope-209
+|       |---s: Filter[bag] - scope-208
 |           |   |
-|           |   Not[boolean] - scope-213
+|           |   Not[boolean] - scope-212
 |           |   |
-|           |   |---Not Equal To[boolean] - scope-212
+|           |   |---Not Equal To[boolean] - scope-211
 |           |       |
-|           |       |---Project[chararray][0] - scope-210
+|           |       |---Project[chararray][0] - scope-209
 |           |       |
-|           |       |---Constant() - scope-211
+|           |       |---Constant() - scope-210
 |
 |---b: New For Each(false)[bag] - scope-11
     |   |
@@ -254,7 +254,7 @@
         |   |---Constant({()}) - scope-35
         |
         |---d: Package(Packager)[tuple]{chararray} - scope-26
-Tez vertex group scope-121	<-	 [scope-84, scope-84, scope-76, scope-74, scope-75]	->	 scope-104
+Tez vertex group scope-121	<-	 [scope-84, scope-84, scope-74, scope-74, scope-75, scope-75]	->	 scope-104
 # No plan on vertex group
 Tez vertex scope-104
 # Plan on vertex