PIG-5341: PigStorage with -tagFile/-tagPath produces incorrect results with column pruning (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1832948 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 006538c..975d238 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -66,6 +66,8 @@
  
 BUG FIXES
 
+PIG-5341: PigStorage with -tagFile/-tagPath produces incorrect results with column pruning (knoguchi)
+
 PIG-5335: Error message from range projection completely misleading (knoguchi)
 
 PIG-5333: LoadCaster sometimes not set for complex type (knoguchi)
diff --git a/src/org/apache/pig/builtin/PigStorage.java b/src/org/apache/pig/builtin/PigStorage.java
index 1cd0d8e..dc049f1 100644
--- a/src/org/apache/pig/builtin/PigStorage.java
+++ b/src/org/apache/pig/builtin/PigStorage.java
@@ -252,10 +252,10 @@
             }
             mRequiredColumnsInitialized = true;
         }
-        //Prepend input source path if source tagging is enabled
-        if(tagFile) {
+        // Prepend input source path if source tagging is enabled
+        if (tagFile && (mRequiredColumns == null || mRequiredColumns[0])) {
             mProtoTuple.add(new DataByteArray(sourcePath.getName()));
-        } else if (tagPath) {
+        } else if (tagPath && (mRequiredColumns == null || mRequiredColumns[0])) {
             mProtoTuple.add(new DataByteArray(sourcePath.toString()));
         }
 
@@ -268,7 +268,9 @@
             byte[] buf = value.getBytes();
             int len = value.getLength();
             int start = 0;
-            int fieldID = 0;
+            // If tagging is enabled, mRequiredColumns is created based on the
+            // schema that includes tagfile/path as first index(0)
+            int fieldID = tagFile || tagPath ? 1 : 0;
             for (int i = 0; i < len; i++) {
                 if (buf[i] == fieldDel) {
                     if (mRequiredColumns==null || (mRequiredColumns.length>fieldID && mRequiredColumns[fieldID]))
diff --git a/test/org/apache/pig/test/TestPigStorage.java b/test/org/apache/pig/test/TestPigStorage.java
index e9406f7..5c9b571 100644
--- a/test/org/apache/pig/test/TestPigStorage.java
+++ b/test/org/apache/pig/test/TestPigStorage.java
@@ -460,14 +460,54 @@
         pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING PigStorage('\\t', '-schema');");
         pig.registerQuery("EventsName = foreach Events generate name;");
         Iterator<Tuple> sessions = pig.openIterator("EventsName");
-        sessions.next().toString().equals("(1)");
-        sessions.next().toString().equals("(2)");
-        sessions.next().toString().equals("(4)");
-        sessions.next().toString().equals("(2)");
-        sessions.next().toString().equals("(4)");
-        sessions.next().toString().equals("(1)");
-        sessions.next().toString().equals("()");
-        Assert.assertFalse(sessions.hasNext());
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('peter')", "('samir')", "('michael')", "('peter')", "('peter')", "('samir')", "('john')"
+                });
+        Util.checkQueryOutputs(sessions, expectedResults);
+    }
+
+    @Test
+    public void testColumnPruneWithSchemaAndTagPath() throws IOException {
+        Util.createLocalInputFile(datadir + "originput2",
+                new String[] {"peter\t1", "samir\t2", "michael\t4",
+                "peter\t2", "peter\t4", "samir\t1", "john\t"
+        });
+        Util.createLocalInputFile(datadir + ".pig_schema",
+                new String[] {
+                "{\"fields\":[{\"name\":\"name\",\"type\":55,\"schema\":null," +
+                "\"description\":\"autogenerated from Pig Field Schema\"}," +
+                "{\"name\":\"val\",\"type\":10,\"schema\":null,\"description\":"+
+                "\"autogenerated from Pig Field Schema\"}],\"version\":0," +
+                "\"sortKeys\":[],\"sortKeyOrders\":[]}"
+        });
+        pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING PigStorage('\\t', '-schema -tagPath');");
+        pig.registerQuery("EventsName = foreach Events generate val;");
+        Iterator<Tuple> sessions = pig.openIterator("EventsName");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "(1)", "(2)", "(4)", "(2)", "(4)", "(1)", "(null)"
+                });
+        Util.checkQueryOutputs(sessions, expectedResults);
+    }
+
+    @Test
+    public void testColumnPruneWithTagFile() throws IOException {
+        // Wanted two tests.  One with tagfile being pruned and another not being
+        // pruned.  Here, testing the latter, and testing the pruned version
+        // with '-schema -tagPath' testing above (testColumnPruneWithSchemaAndTagPath)
+        Util.createLocalInputFile(datadir + "originput2",
+                new String[] {"peter\t1", "samir\t2", "michael\t4",
+                "peter\t2", "peter\t4", "samir\t1", "john\t"
+        });
+        pig.registerQuery("Events = LOAD '" + datadir + "originput2' USING PigStorage('\\t', '-tagFile') as (filename:chararray, name:chararray,val:int);");
+        pig.registerQuery("EventsName = foreach Events generate filename, val;");
+        Iterator<Tuple> sessions = pig.openIterator("EventsName");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('originput2',1)", "('originput2',2)", "('originput2',4)", "('originput2',2)", "('originput2',4)", "('originput2',1)", "('originput2',null)"
+                });
+        Util.checkQueryOutputs(sessions, expectedResults);
     }
 
     @Test