PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1819344 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 694a637..a443926 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@
 OPTIMIZATIONS
  
 BUG FIXES
+PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)
+
 PIG-5300: hashCode for Bag needs to be order independent (knoguchi)
 
 PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
diff --git a/src/org/apache/pig/newplan/logical/relational/LOUnion.java b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
index b919fc5..fbd42ed 100644
--- a/src/org/apache/pig/newplan/logical/relational/LOUnion.java
+++ b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
@@ -108,60 +108,16 @@
         }
 
         // Bring back cached uid if any; otherwise, cache uid generated
-        for (int i=0;i<mergedSchema.size();i++)
-        {
-            LogicalSchema.LogicalFieldSchema outputFieldSchema = mergedSchema.getField(i);
+        setMergedSchemaUids(mergedSchema, inputSchemas);
 
-            long uid = -1;
-            
-            // Search all the cached uid mappings by input field to see if 
-            // we've cached an output uid for this output field
-            for (LogicalSchema inputSchema : inputSchemas) {
-                LogicalSchema.LogicalFieldSchema inputFieldSchema;
-                if (onSchema) {
-                    inputFieldSchema = inputSchema.getFieldSubNameMatch(outputFieldSchema.alias);
-                } else {
-                    inputFieldSchema = inputSchema.getField(i);
-                }
-                
-                if (inputFieldSchema != null) {
-                    uid = getCachedOuputUid(inputFieldSchema.uid);
-                    if (uid >= 0) break;
-                }
-            }
-            
-            // No cached uid. Allocate one, and locate and cache all inputs.
-            if (uid==-1) {
-                uid = LogicalExpression.getNextUid();
-                for (LogicalSchema inputSchema : inputSchemas) {
-                    long inputUid;
-                    LogicalFieldSchema matchedInputFieldSchema;
-                	if (onSchema) {
-                	    matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
-                        if (matchedInputFieldSchema!=null) {
-                            inputUid = matchedInputFieldSchema.uid;
-                            uidMapping.add(new Pair<Long, Long>(uid, inputUid));
-                        }
-                    }
-                    else {
-                        matchedInputFieldSchema = mergedSchema.getField(i);
-	                	inputUid = inputSchema.getField(i).uid;
-	                	uidMapping.add(new Pair<Long, Long>(uid, inputUid));
-                    }
-                }
-            }
-
-            outputFieldSchema.uid = uid;
-        }
-        
         return schema = mergedSchema;
     }
 
     /**
      * create schema for union-onschema
      */
-    private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas, 
-            List<String> inputAliases) 
+    private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas,
+            List<String> inputAliases)
     throws FrontendException {
         ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
         for (int i = 0; i < inputSchemas.size(); i++){
@@ -175,20 +131,81 @@
             }
             schemas.add( sch );
         }
-        
+
         //create the merged schema
         LogicalSchema mergedSchema = null;
         try {
-            mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );   
+            mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
         } catch(FrontendException e)                 {
             String msg = "Error merging schemas for union operator : "
                 + e.getMessage();
             throw new FrontendException(this, msg, 1116, PigException.INPUT, e);
         }
-        
+
         return mergedSchema;
     }
-    
+
+    private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas)
+    throws FrontendException {
+
+        for (int i=0;i<mergedSchema.size();i++) {
+            LogicalSchema.LogicalFieldSchema outputFieldSchema = mergedSchema.getField(i);
+
+            long uid = -1;
+            List<LogicalSchema> fieldInputSchemas = new ArrayList<>(inputSchemas.size());
+            
+            // Search all the cached uid mappings by input field to see if 
+            // we've cached an output uid for this output field
+            for (LogicalSchema inputSchema : inputSchemas) {
+                LogicalSchema.LogicalFieldSchema inputFieldSchema;
+                if (onSchema) {
+                    inputFieldSchema = inputSchema.getFieldSubNameMatch(outputFieldSchema.alias);
+                } else {
+                    inputFieldSchema = inputSchema.getField(i);
+                }
+                
+                if (inputFieldSchema != null) {
+                    if (inputFieldSchema.schema != null) {
+                        fieldInputSchemas.add(inputFieldSchema.schema);
+                    }
+
+                    if (uid < 0) {
+                        uid = getCachedOuputUid(inputFieldSchema.uid);
+                        if (uid >= 0 && outputFieldSchema.schema == null) break;
+                    }
+                }
+            }
+            
+            // No cached uid. Allocate one, and locate and cache all inputs.
+            if (uid==-1) {
+                uid = LogicalExpression.getNextUid();
+                for (LogicalSchema inputSchema : inputSchemas) {
+                    long inputUid;
+                    LogicalFieldSchema matchedInputFieldSchema;
+                    if (onSchema) {
+                        matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
+                        if (matchedInputFieldSchema!=null) {
+                            inputUid = matchedInputFieldSchema.uid;
+                            uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+                        }
+                    }
+                    else {
+                        matchedInputFieldSchema = mergedSchema.getField(i);
+                        inputUid = inputSchema.getField(i).uid;
+                        uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+                    }
+                }
+            }
+
+            outputFieldSchema.uid = uid;
+
+            // This field has a schema. Assign uids to it as well
+            if (outputFieldSchema.schema != null) {
+                setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas);
+            }
+        }
+    }
+
     private long getCachedOuputUid(long inputUid) {
         long uid = -1;
         
diff --git a/test/org/apache/pig/test/TestUnionOnSchema.java b/test/org/apache/pig/test/TestUnionOnSchema.java
index 3dea37f..60081c3 100644
--- a/test/org/apache/pig/test/TestUnionOnSchema.java
+++ b/test/org/apache/pig/test/TestUnionOnSchema.java
@@ -478,6 +478,43 @@
      * Test UNION ONSCHEMA on 3 inputs 
      */
     @Test
+    public void testUnionOnSchemaInnerSchema() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
+        String query =
+            "  l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+            + "  (i : long, c : chararray, j : int "
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+            + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+            + "  (i : long, c : chararray, j : int "
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+            + "u = union onschema l1, l2; "
+            // The addition in the inner foreach will fail if the inner schema's uids
+            // are all set to -1, since the code that finds the inner load's schema will
+            // match the last item in b's schema, which is a chararray
+            + "p = foreach u { x = foreach b GENERATE c1 + 5 as c3; GENERATE i, c, x; }";
+
+        Util.registerMultiLineQuery(pig, query);
+        pig.explain("p", System.out);
+
+        Iterator<Tuple> it = pig.openIterator("p");
+
+        List<Tuple> expectedRes =
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                        "(1L,'abc',{(6),(6)})",
+                        "(5L,'def',{(7),(7)})",
+                        "(1L,'abc',{(6),(6)})",
+                        "(5L,'def',{(7),(7)})"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+
+    /**
+     * Test UNION ONSCHEMA on 3 inputs
+     * @throws IOException
+     * @throws ParserException
+     */
+    @Test
     public void testUnionOnSchema3Inputs() throws Exception {
         PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =