PIG-5231: PigStorage with -schema may produce inconsistent outputs with more fields (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1796191 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 66741bd..bd2ffa3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -101,6 +101,8 @@
  
 BUG FIXES
 
+PIG-5231: PigStorage with -schema may produce inconsistent outputs with more fields (knoguchi)
+
 PIG-5224: Extra foreach from ColumnPrune preventing Accumulator usage (knoguchi)
 
 PIG-5235: Typecast with as-clause fails for tuple/bag with an empty schema (knoguchi)
diff --git a/src/org/apache/pig/builtin/PigStorage.java b/src/org/apache/pig/builtin/PigStorage.java
index 8d31d98..1cd0d8e 100644
--- a/src/org/apache/pig/builtin/PigStorage.java
+++ b/src/org/apache/pig/builtin/PigStorage.java
@@ -334,6 +334,18 @@
                     tupleIdx++;
                 }
             }
+            // If input record somehow has more fields than the provided schema
+            // drop the extra fields
+            if( tup.size() > fieldSchemas.length ) {
+                int lastindex = tup.size() - 1;
+                List<Object> list = tup.getAll();
+                for(int i = lastindex; i >= fieldSchemas.length ; i--) {
+                    list.remove(i);
+                }
+                // Tuple.getAll() may not return reference to the interal List
+                // so creating a new Tuple.
+                tup =  mTupleFactory.newTupleNoCopy(list);
+            }
         }
         return tup;
     }
diff --git a/test/org/apache/pig/test/TestPigStorage.java b/test/org/apache/pig/test/TestPigStorage.java
index 32b3d1a..e9406f7 100644
--- a/test/org/apache/pig/test/TestPigStorage.java
+++ b/test/org/apache/pig/test/TestPigStorage.java
@@ -789,4 +789,35 @@
         pig.store("a", datadir + "aout", "PigStorage(',')");
     }
 
+    @Test
+    public void testPigStroageSchemaWithMultipleSchema() throws Exception {
+        pigContext.connect();
+        String query = "A = LOAD '" + datadir + "originput' using PigStorage(',') as (f1:chararray, f2:int);"
+                + "B = FOREACH A generate f1, f2, 3 as (f3:int);";
+        pig.registerQuery(query);
+        pig.store("A", datadir + "aout", "PigStorage('\\t', '-schema')");
+        pig.store("B", datadir + "bout", "PigStorage('\\t', '-schema')");
+
+        // We want to test the case when aout/.pig_schema is chosen for loading
+        // aout AND bout.
+        // Picking of schema is not deterministic given it's picked from a SET.
+        // For this test, we simply delete the other schema.
+        new File(datadir + "bout/.pig_schema" ).delete();
+
+        // Loading from 2 directories, each containing 2 fields and 3 fields
+        // respectively.
+        pig.registerQuery("C = LOAD '" + datadir + "aout," + datadir + "bout ' using PigStorage('\\t', '-schema');");
+        Schema a_schema = pig.dumpSchema("A");
+        Schema c_schema = pig.dumpSchema("C");
+        Assert.assertEquals("PigStorage schema should pick up the .pig_schema from A", a_schema, c_schema);
+        Iterator<Tuple> iter = pig.openIterator("C");
+        int counter = 0;
+        while (iter.hasNext()) {
+            Assert.assertEquals("All tuples should only contain 2 fields defined in schema",
+                                2, iter.next().size());
+            counter++;
+        }
+        Assert.assertEquals(20, counter);
+    }
+
 }