PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1847856 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d240afa..fc51596 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -88,6 +88,8 @@
  
 BUG FIXES
 
+PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
+
 PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via rohini)
 
 PIG-5355: Negative progress report by HBaseTableRecordReader (satishsaley via knoguchi)
diff --git a/src/org/apache/pig/newplan/logical/relational/LOUnion.java b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
index fbd42ed..137f038 100644
--- a/src/org/apache/pig/newplan/logical/relational/LOUnion.java
+++ b/src/org/apache/pig/newplan/logical/relational/LOUnion.java
@@ -34,9 +34,11 @@
 
 public class LOUnion extends LogicalRelationalOperator {
     private boolean onSchema;
+
+    private static String UID_SEPARATOR = "_";
     
     // uid mapping from output uid to input uid
-    private List<Pair<Long, Long>> uidMapping = new ArrayList<Pair<Long, Long>>();
+    private List<Pair<Long, String>> uidMapping = new ArrayList<Pair<Long, String>>();
     
     public LOUnion(OperatorPlan plan) {
         super("LOUnion", plan);
@@ -108,7 +110,7 @@
         }
 
         // Bring back cached uid if any; otherwise, cache uid generated
-        setMergedSchemaUids(mergedSchema, inputSchemas);
+        setMergedSchemaUids(mergedSchema, inputSchemas, "");
 
         return schema = mergedSchema;
     }
@@ -145,7 +147,7 @@
         return mergedSchema;
     }
 
-    private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas)
+    private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas, String nested_uids)
     throws FrontendException {
 
         for (int i=0;i<mergedSchema.size();i++) {
@@ -170,7 +172,7 @@
                     }
 
                     if (uid < 0) {
-                        uid = getCachedOuputUid(inputFieldSchema.uid);
+                        uid = getCachedOuputUid(createNestedUids(nested_uids,inputFieldSchema.uid));
                         if (uid >= 0 && outputFieldSchema.schema == null) break;
                     }
                 }
@@ -186,13 +188,13 @@
                         matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
                         if (matchedInputFieldSchema!=null) {
                             inputUid = matchedInputFieldSchema.uid;
-                            uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+                            uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid)));
                         }
                     }
                     else {
                         matchedInputFieldSchema = mergedSchema.getField(i);
                         inputUid = inputSchema.getField(i).uid;
-                        uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+                        uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid)));
                     }
                 }
             }
@@ -201,16 +203,28 @@
 
             // This field has a schema. Assign uids to it as well
             if (outputFieldSchema.schema != null) {
-                setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas);
+                setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas, createNestedUids(nested_uids,outputFieldSchema.uid));
             }
         }
     }
 
-    private long getCachedOuputUid(long inputUid) {
+    private String createNestedUids(String nested_uids, long new_uid) {
+        StringBuilder sb = new StringBuilder(nested_uids);
+        sb.append(UID_SEPARATOR);
+        sb.append(new_uid);
+        return sb.toString();
+    }
+
+    private long getLeafUid(String nested_uids) {
+        String [] uid_root_to_leaf = nested_uids.split(UID_SEPARATOR);
+        return Long.valueOf(uid_root_to_leaf[uid_root_to_leaf.length-1]);
+    }
+
+    private long getCachedOuputUid(String nested_input_uids) {
         long uid = -1;
         
-        for (Pair<Long, Long> pair : uidMapping) {
-            if (pair.second==inputUid) {
+        for (Pair<Long, String> pair : uidMapping) {
+            if (pair.second.equals(nested_input_uids)) {
                 uid = pair.first;
                 break;
             }
@@ -237,18 +251,18 @@
     }
 
     // Get input uids mapping to the output uid
-    public Set<Long> getInputUids(long uid) {
+    public Set<Long> getInputUids(long outputuid) {
         Set<Long> result = new HashSet<Long>();
-        for (Pair<Long, Long> pair : uidMapping) {
-            if (pair.first==uid)
-                result.add(pair.second);
+        for (Pair<Long, String> pair : uidMapping) {
+            if (pair.first==outputuid)
+                result.add(getLeafUid(pair.second));
         }
         return result;
     }
     
     @Override
     public void resetUid() {
-        uidMapping = new ArrayList<Pair<Long, Long>>();
+        uidMapping = new ArrayList<Pair<Long, String>>();
     }
     
     public List<Operator> getInputs() {
diff --git a/test/org/apache/pig/test/TestNewPlanColumnPrune.java b/test/org/apache/pig/test/TestNewPlanColumnPrune.java
index 67a02af..8193bf6 100644
--- a/test/org/apache/pig/test/TestNewPlanColumnPrune.java
+++ b/test/org/apache/pig/test/TestNewPlanColumnPrune.java
@@ -482,6 +482,35 @@
         }
     }
 
+    @Test
+    public void testUnionOnschemaWithInnerBag() throws Exception  {
+        // After handing inner-bag in Union-onschema,
+        // ColumnPrune broke due to overlapping uid inside the relation and
+        // ones inside the inner-bag  (PIG-5370)
+        String query = "A0 = load 'd.txt' as (a0:int, a1:int, a2:int, a3:int);" +
+        "A = FOREACH A0 GENERATE a0, a1, a2;" +
+        "B = FOREACH (GROUP A by (a0,a1)) {" +
+        "    A_FOREACH = FOREACH A GENERATE a1,a2;" +
+        "    GENERATE A, FLATTEN(A_FOREACH) as (a1,a2);" +
+        "}" +
+        "C = load 'd2.txt' as (A:bag{tuple:(a0:int, a1:int, a2:int)}, a1:int,a2:int);" +
+        "Z = UNION ONSCHEMA B, C;"  +
+        "store Z into 'empty';";
+
+        LogicalPlan newLogicalPlan = buildPlan(query);
+
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        System.err.println(newLogicalPlan);
+        Iterator<Operator> iter = newLogicalPlan.getOperators();
+        while (iter.hasNext()) {
+            Operator o = iter.next();
+            LogicalRelationalOperator lro = (LogicalRelationalOperator)o;
+            if (lro == null || lro.getAlias() == null) continue;
+            assertNotNull(lro.getSchema());
+        }
+    }
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
 
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
@@ -505,4 +534,3 @@
         }
     }
 }
-