PIG-5404: FLATTEN infers wrong datatype (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1882546 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f2710f3..0580cc6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -100,6 +100,8 @@
  
 BUG FIXES
 
+PIG-5404: FLATTEN infers wrong datatype (knoguchi)
+
 PIG-5243: describe with typecast on as-clause shows the types before the typecasting (knoguchi)
 
 PIG-5403: streaming job stuck with script failure when combined with ORDER BY (knoguchi)
diff --git a/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java b/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
index dd76dc7..304012c 100644
--- a/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
+++ b/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
@@ -200,12 +200,15 @@
         gen.setFlattenFlags(new boolean[index]);
         if (needCast) {
             // Insert the casterForEach into the plan and patch up the plan.
+            plan.add(casterForEach);
             List <Operator> successorOps = plan.getSuccessors(foreach);
             if (successorOps != null && successorOps.size() > 0){
-                Operator next = plan.getSuccessors(foreach).get(0);
-                plan.insertBetween(foreach, casterForEach, next);
-            }else{
-                plan.add(casterForEach);
+                // since successorOps will be updated as part of the inserts,
+                // creating a shallow-copy first before traversing the list
+                for (Operator next : successorOps.toArray(new Operator [successorOps.size()])) {
+                    plan.insertBetween(foreach, casterForEach, next);
+                }
+            } else {
                 plan.connect(foreach,casterForEach);
             }
 
diff --git a/test/org/apache/pig/test/TestPlanGeneration.java b/test/org/apache/pig/test/TestPlanGeneration.java
index 8a150c7..6955899 100644
--- a/test/org/apache/pig/test/TestPlanGeneration.java
+++ b/test/org/apache/pig/test/TestPlanGeneration.java
@@ -464,6 +464,27 @@
     }
 
     @Test
+    public void testForEachWithCast8() throws Exception {
+        //ForEachWithCast + Split (PIG-5404)
+        String query = "A = load 'foo' as (a:int, b:int);\n" +
+                "B = foreach A generate a as a0:chararray, b as b:int;\n" +
+                "store B into 'output1';\n" +
+                "store B into 'output2';\n" ;
+
+        LogicalPlan lp = Util.parse(query, pc);
+        Util.optimizeNewLP(lp);
+
+        assertEquals("Two LOStores should be created", 2, lp.getSinks().size());
+        for(Operator leave : lp.getSinks() ) {
+            LOStore loStore = (LOStore) leave;
+            assertEquals("a0",loStore.getSchema().getField(0).alias);
+            assertEquals(DataType.CHARARRAY, loStore.getSchema().getField(0).type);
+            assertEquals("b", loStore.getSchema().getField(1).alias);
+            assertEquals(DataType.INTEGER, loStore.getSchema().getField(1).type);
+        }
+    }
+
+    @Test
     // See PIG-2315
     public void testAsType1() throws Exception {
         Data data = Storage.resetData(ps);