PIG-5201: Null handling on FLATTEN (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1817591 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fd01d2d..e3d869a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,8 @@
  
 BUG FIXES
 
+PIG-5201: Null handling on FLATTEN (knoguchi)
+
 PIG-5315: pig.script is not set for scripts run via PigServer (satishsaley via rohini)
 
 PIG-5310: MergeJoin throwing NullPointer Exception (satishsaley via rohini)
diff --git a/src/docs/src/documentation/content/xdocs/basic.xml b/src/docs/src/documentation/content/xdocs/basic.xml
index 9892ff5..9f14164 100644
--- a/src/docs/src/documentation/content/xdocs/basic.xml
+++ b/src/docs/src/documentation/content/xdocs/basic.xml
@@ -1225,6 +1225,20 @@
 (joe,18,2.5,joe,18,2.5)
 </source>
    </section>
+   <section id="nulls_flatten">
+   <title>Nulls and FLATTEN Operator</title>
+   <p>The FLATTEN operator handles null value differently based on its schema.</p>
+   <p>For null tuples, FLATTEN(null) produces multiples nulls based on the number of elements in the schema for that field.
+   If tuple has no schema, FLATTEN(null) simply returns a single null. </p>
+   <p>For null bags, we would have liked to discard the row just like we do with flatten of an empty bag.
+   However, it was too late by the time we noticed this inconsistency.
+   In order to preserve the backward compatibility, FLATTEN(null) for bag produces multiples nulls
+   based on the number of elements defined for the schema of this bag.
+   If no schema, a single null is returned. </p>
+   <p>For bags containing some null tuples, it follows the same rule as flatten of null tuples described above. </p>
+   <p>For null maps, FLATTEN(null) produces 2 nulls to represent the key and the value.</p>
+   <p>For null with other types, FLATTEN(null) simply returns a single null.</p>
+   </section>
    
    </section>
   
@@ -5453,8 +5467,11 @@
       we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
    </p>
 
+   <p>For other types, flatten becomes a no-op and simply returns the passed value. </p>
+
    <p>Also note that the flatten of empty bag will result in that row being discarded; no output is generated. 
    (See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
+   <p>As for flatten with null values, see <a href="#nulls_flatten">Nulls and FLATTEN operator</a>.</p>
    
    <source>
 grunt> cat empty.bag
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
index aba5131..e9fa5a6 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
@@ -74,6 +74,8 @@
     // so we can also save on the Boolean.booleanValue() calls
     protected boolean[] isToBeFlattenedArray;
 
+    private int[] flattenNumFields = null;
+
     protected int noItems;
 
     //Since the plan has a generate, this needs to be maintained
@@ -446,6 +448,8 @@
                 } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
                     its[i] = ((Map)bags[i]).entrySet().iterator();
                 } else {
+                    // This includes FLATTEN(null) for bag and map
+                    // in addition to non-null values from other data types
                     its[i] = null;
                 }
             }
@@ -490,7 +494,6 @@
                 if(getReporter()!=null) {
                     getReporter().progress();
                 }
-                //createTuple(data);
                 res.result = createTuple(data);
                 res.returnStatus = POStatus.STATUS_OK;
                 return res;
@@ -538,17 +541,23 @@
         for(int i = 0; i < data.length; ++i) {
             Object in = data[i];
 
-            if(isToBeFlattenedArray[i] && in instanceof Tuple) {
+            if(!isToBeFlattenedArray[i]) {
+                if (knownSize) {
+                    out.set(idx++, in);
+                } else {
+                    out.append(in);
+                }
+            } else if(in instanceof Tuple) {
                 Tuple t = (Tuple)in;
                 int size = t.size();
                 for(int j = 0; j < size; ++j) {
                     if (knownSize) {
                         out.set(idx++, t.get(j));
                     } else {
-                    out.append(t.get(j));
+                        out.append(t.get(j));
+                    }
                 }
-                }
-            } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
+            } else if (in instanceof Map.Entry) {
                 Map.Entry entry = (Map.Entry)in;
                 if (knownSize) {
                     out.set(idx++, entry.getKey());
@@ -557,14 +566,25 @@
                     out.append(entry.getKey());
                     out.append(entry.getValue());
                 }
+            } else if (in == null &&
+                       flattenNumFields != null && flattenNumFields[i] != 0 ) {
+                // Handling of FLATTEN(null) here.
+                // Expanding to multiple nulls depending on the schema
+                for( int j = 0; j < flattenNumFields[i]; j++ ) {
+                    if (knownSize) {
+                        out.set(idx++, null);
+                    } else {
+                        out.append(null);
+                    }
+                }
             } else {
                 if (knownSize) {
                     out.set(idx++, in);
-            } else {
-                out.append(in);
+                } else {
+                    out.append(in);
+                }
             }
         }
-        }
         if (inpTuple != null) {
             return illustratorMarkup(inpTuple, out, 0);
         } else {
@@ -706,6 +726,7 @@
         clone.addOriginalLocation(alias, getOriginalLocations());
         clone.endOfAllInputProcessing = endOfAllInputProcessing;
         clone.mapSideOnly = mapSideOnly;
+        clone.flattenNumFields = flattenNumFields;
         return clone;
     }
 
@@ -714,6 +735,10 @@
         return processingPlan;
     }
 
+    public void setFlattenNumFields (int [] flattenNumFields) {
+        this.flattenNumFields = flattenNumFields;
+    }
+
     protected void setUpFlattens(List<Boolean> isToBeFlattened) {
         if(isToBeFlattened == null) {
             isToBeFlattenedArray = null;
diff --git a/src/org/apache/pig/impl/util/TupleFormat.java b/src/org/apache/pig/impl/util/TupleFormat.java
index ad76b52..1a00663 100644
--- a/src/org/apache/pig/impl/util/TupleFormat.java
+++ b/src/org/apache/pig/impl/util/TupleFormat.java
@@ -47,32 +47,34 @@
     public static String format(Tuple tuple) {
         StringBuilder sb = new StringBuilder();
         sb.append('(');
-        for (int i = 0; i < tuple.size(); ++i) {
-            try {
-                Object d = tuple.get(i);
-                if (d != null) {
-                    if (d instanceof Map) {
-                        sb.append(DataType.mapToString((Map<String, Object>) d));
-                    } else if (d instanceof Tuple) {
-                        Tuple t = (Tuple) d;
-                        sb.append(TupleFormat.format(t));
-                    } else if (d instanceof DataBag){
-                        DataBag bag=(DataBag)d;
-                        sb.append(BagFormat.format(bag));
+        if( tuple != null ) {
+            for (int i = 0; i < tuple.size(); ++i) {
+                try {
+                    Object d = tuple.get(i);
+                    if (d != null) {
+                        if (d instanceof Map) {
+                            sb.append(DataType.mapToString((Map<String, Object>) d));
+                        } else if (d instanceof Tuple) {
+                            Tuple t = (Tuple) d;
+                            sb.append(TupleFormat.format(t));
+                        } else if (d instanceof DataBag){
+                            DataBag bag=(DataBag)d;
+                            sb.append(BagFormat.format(bag));
+                        }
+                        else {
+                            sb.append(d.toString());
+                        }
+                    } else {
+                        sb.append("");
                     }
-                    else {
-                        sb.append(d.toString());
-                    }
-                } else {
-                    sb.append("");
+                    if (i != tuple.size() - 1)
+                        sb.append(",");
+                } catch (ExecException e) {
+                    e.printStackTrace();
+                    mLog.warn("Exception when format tuple", e);
                 }
-                if (i != tuple.size() - 1)
-                    sb.append(",");
-            } catch (ExecException e) {
-                e.printStackTrace();
-                mLog.warn("Exception when format tuple", e);
-            }
 
+            }
         }
         sb.append(')');
         return sb.toString();
diff --git a/src/org/apache/pig/newplan/logical/relational/LOGenerate.java b/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
index 2609c2d..13f5b38 100644
--- a/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
+++ b/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
@@ -33,6 +33,7 @@
 public class LOGenerate extends LogicalRelationalOperator {
      private List<LogicalExpressionPlan> outputPlans;
      private boolean[] flattenFlags;
+     private int[] flattenNumFields;
      // mUserDefinedSchema is the original input from the user, we don't suppose
      // to store uid in mUserDefinedSchema
      private List<LogicalSchema> mUserDefinedSchema = null;
@@ -74,7 +75,9 @@
         outputPlanSchemas = new ArrayList<LogicalSchema>();
         expSchemas = new ArrayList<LogicalSchema>();
         
+        flattenNumFields = new int[outputPlans.size()];
         for(int i=0; i<outputPlans.size(); i++) {
+            flattenNumFields[i] = 0;
             LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
             
             LogicalSchema mUserDefinedSchemaCopy = null;
@@ -111,8 +114,10 @@
                             if (fieldSchema.type == DataType.BAG) {
                                 // if it is bag, get the schema of tuples
                                 if (fieldSchema.schema!=null) {
-                                    if (fieldSchema.schema.getField(0).schema!=null)
+                                    if (fieldSchema.schema.getField(0).schema!=null) {
                                         innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
+                                        flattenNumFields[i] = innerFieldSchemas.size();
+                                    }
                                     for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
                                         fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                     }
@@ -120,6 +125,7 @@
                             } else if (fieldSchema.type == DataType.MAP) {
                                 //should only contain 1 schemafield for Map's value
                                 innerFieldSchemas = fieldSchema.schema.getFields();
+                                flattenNumFields[i] = 2;  // used for FLATTEN(null-map)
                                 LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
                                 fsForValue.alias = fieldSchema.alias + "::value";
 
@@ -129,6 +135,7 @@
                                 expSchema.addField(fsForKey);
                             } else { // DataType.TUPLE
                                 innerFieldSchemas = fieldSchema.schema.getFields();
+                                flattenNumFields[i] = innerFieldSchemas.size();
                                 for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
                                     fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                 }
@@ -213,6 +220,10 @@
     public boolean[] getFlattenFlags() {
         return flattenFlags;
     }
+
+    public int [] getFlattenNumFields() {
+        return flattenNumFields;
+    }
     
     public void setFlattenFlags(boolean[] flatten) {
         flattenFlags = flatten;
diff --git a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
index 7faa1fd..7526d7d 100644
--- a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
+++ b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
@@ -877,6 +877,7 @@
         }
         POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), foreach.getRequestedParallelism(), innerPlans, flattenList, schema);
+        poFE.setFlattenNumFields(gen.getFlattenNumFields());
         poFE.addOriginalLocation(foreach.getAlias(), foreach.getLocation());
         poFE.setResultType(DataType.BAG);
         logToPhyMap.put(foreach, poFE);
diff --git a/test/org/apache/pig/test/TestFlatten.java b/test/org/apache/pig/test/TestFlatten.java
index f315b84..c253afc 100644
--- a/test/org/apache/pig/test/TestFlatten.java
+++ b/test/org/apache/pig/test/TestFlatten.java
@@ -97,4 +97,270 @@
                 "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')" });
         Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
     }
+
+    @Test
+    public void testFlattenOnNullBag() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                Storage.bag(
+                    Storage.tuple("a","b"),
+                    Storage.tuple("c","d")),
+                Storage.bag(
+                    Storage.tuple("1","2"),
+                    Storage.tuple("3","4"))
+            ),
+            Storage.tuple(
+                null,
+                Storage.bag(
+                    Storage.tuple("11","12"),
+                    Storage.tuple("13","14"))
+            ),
+            Storage.tuple(
+                Storage.bag(
+                    Storage.tuple("k","l"),
+                    Storage.tuple("m","n")),
+                null
+            ),
+            Storage.tuple(
+                Storage.bag(
+                    null,
+                    Storage.tuple("e","f")),
+                Storage.bag(
+                    Storage.tuple("5","6"),
+                    Storage.tuple("7","8"))
+            ),
+            Storage.tuple(
+                Storage.bag(
+                    Storage.tuple("g","h"),
+                    Storage.tuple("i","j")),
+                Storage.bag(
+                    Storage.tuple("9","10"),
+                    null
+                    )
+            )
+        );
+
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag {(a1_1:int, a1_2:chararray)}, bag2:bag{(a2_1:chararray, a2_2:chararray)});");
+        pig.registerQuery("B = foreach A GENERATE FLATTEN(bag1), FLATTEN(bag2);");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')",
+
+                //flatten(null-bag) on schema {(a1_1:int, a1_2:chararray)} expands to (null, null)
+                "(null, null, '11', '12')", "(null, null, '13', '14')",
+                "('k', 'l', null, null)", "('m', 'n', null, null)",
+
+                //flatten(null-tuple-from-bag) on schema {(a1_1:int, a1_2:chararray)} also expands to (null, null)
+                "(null, null, '5', '6')", "(null, null, '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')",
+                "('g', 'h', '9', '10')", "('g', 'h', null, null)", "('i', 'j', '9', '10')", "('i', 'j', null, null)" });
+
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testFlattenOnNullMap() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                Storage.map("a","b",
+                            "c","d"),
+                Storage.map("1","2",
+                            "3","4")
+            )
+            ,
+            Storage.tuple(
+                null,
+                Storage.map("11","12",
+                            "13","14")
+            ),
+            Storage.tuple(
+                Storage.map("k","l",
+                            "m","n"),
+                null
+            )
+        );
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (map1:map [chararray], map2:map [chararray]);");
+        pig.registerQuery("B = foreach A GENERATE FLATTEN(map1), FLATTEN(map2);");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')",
+                // flatten(null-map) should expand to (null, null)
+                "(null, null, '11', '12')", "(null, null, '13', '14')", "('k', 'l', null, null)", "('m', 'n', null, null)"
+                });
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testFlattenOnNullTuple() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                Storage.tuple("a","b"),
+                Storage.tuple("1","2")
+            ),
+            Storage.tuple(
+                null,
+                Storage.tuple("3","4")
+            ),
+            Storage.tuple(
+                Storage.tuple("c","d"),
+                null
+            ),
+            Storage.tuple(
+                Storage.tuple("e", null),
+                Storage.tuple(null,"5")
+            )
+        );
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (tuple1:tuple (a1:chararray, a2:chararray), tuple2:tuple (a3:chararray, a4:chararray));");
+        pig.registerQuery("B = foreach A GENERATE FLATTEN(tuple1), FLATTEN(tuple2);");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('a', 'b', '1', '2')", "(null, null, '3', '4')", "('c', 'd', null, null)", "('e', null, null, '5')" });
+
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testFlattenOnNullWithNoSchema() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                null,
+                Storage.bag(
+                    Storage.tuple("1","2"),
+                    Storage.tuple("3","4"))
+            ),
+
+            Storage.tuple(
+                Storage.bag(
+                    null,
+                    Storage.tuple("e","f")),
+                Storage.bag(
+                    Storage.tuple("5","6"),
+                    Storage.tuple("7","8"))
+            ),
+
+            Storage.tuple(
+                null,
+                Storage.map("9","10")
+            ),
+
+            Storage.tuple(
+                Storage.tuple("g","h"),
+                null
+            ),
+
+            Storage.tuple(
+                Storage.tuple("13", null),
+                Storage.tuple(null,"16")
+            )
+        );
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (a1, a2);");
+        pig.registerQuery("B = foreach A GENERATE FLATTEN(a1), FLATTEN(a2);");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "(null, '1', '2')", "(null, '3', '4')",  //since no schema, flatten(null) ==> one null
+                "(null, '5', '6')", "(null, '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')",
+                "(null, '9', '10')",
+                "('g', 'h', null)",
+                "('13', null, null, '16')"});
+
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testFlattenOnNullBagWithColumnPrune() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+            Storage.tuple(
+                1,
+                Storage.bag(
+                    Storage.tuple("a","b"),
+                    Storage.tuple("c","d")),
+                Storage.bag(
+                    Storage.tuple("1","2"),
+                    Storage.tuple("3","4"))
+            ),
+            Storage.tuple(
+                2,
+                null,
+                Storage.bag(
+                    Storage.tuple("11","12"),
+                    Storage.tuple("13","14"))
+            ),
+            Storage.tuple(
+                3,
+                Storage.bag(
+                    Storage.tuple("k","l"),
+                    Storage.tuple("m","n")),
+                null
+            ),
+            Storage.tuple(
+                4,
+                Storage.bag(
+                    null,
+                    Storage.tuple("e","f")),
+                Storage.bag(
+                    Storage.tuple("5","6"),
+                    Storage.tuple("7","8"))
+            ),
+            Storage.tuple(
+                5,
+                Storage.bag(
+                    Storage.tuple("g","h"),
+                    Storage.tuple("i","j")),
+                Storage.bag(
+                    Storage.tuple("9","10"),
+                    null
+                    )
+            )
+        );
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (a0:int, bag1:bag {(a1_1:int, a1_2:chararray)}, bag2:bag{(a2_1:chararray, a2_2:chararray)});");
+        pig.registerQuery("B = foreach A GENERATE FLATTEN(bag1), FLATTEN(bag2);");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+        List<ExecJob> execJobs = pig.executeBatch();
+        for( ExecJob execJob : execJobs ) {
+          assertTrue(execJob.getStatus() == ExecJob.JOB_STATUS.COMPLETED );
+        }
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {
+                "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')",
+                "(null, null, '11', '12')", "(null, null, '13', '14')",
+                "('k', 'l', null, null)", "('m', 'n', null, null)",
+                "(null, null, '5', '6')", "(null, null, '7', '8')", "('e', 'f', '5', '6')", "('e', 'f', '7', '8')",
+                "('g', 'h', '9', '10')", "('g', 'h', null, null)", "('i', 'j', '9', '10')", "('i', 'j', null, null)" });
+
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
 }
diff --git a/test/org/apache/pig/test/TestLineageFindRelVisitor.java b/test/org/apache/pig/test/TestLineageFindRelVisitor.java
index cd54065..8e893f5 100644
--- a/test/org/apache/pig/test/TestLineageFindRelVisitor.java
+++ b/test/org/apache/pig/test/TestLineageFindRelVisitor.java
@@ -181,7 +181,7 @@
         pig.executeBatch();
 
         List<Tuple> actualResults = data.get("output");
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(
                 new String[] {"('aaa', 'bbb')"});
         Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
     }
diff --git a/test/org/apache/pig/test/TestProjectRange.java b/test/org/apache/pig/test/TestProjectRange.java
index 660c411..515d5fa 100644
--- a/test/org/apache/pig/test/TestProjectRange.java
+++ b/test/org/apache/pig/test/TestProjectRange.java
@@ -554,8 +554,8 @@
         List<Tuple> expectedRes =
             Util.getTuplesFromConstantTupleStrings(
                     new String[] {
-                            "(10,20,30,40,50)",
-                            "(11,21,31,41,51)",
+                            "(10,20L,30,40,50)",
+                            "(11,21L,31,41,51)",
                     });
         Util.checkQueryOutputs(it, expectedRes);
     }
@@ -799,8 +799,8 @@
         List<Tuple> expectedRes =
             Util.getTuplesFromConstantTupleStrings(
                     new String[] {
-                            "(10,20,30,40,50)",
-                            "(11,21,31,41,51)",
+                            "(10,20L,30,40,50)",
+                            "(11,21L,31,41,51)",
                     });
         Util.checkQueryOutputs(it, expectedRes);
     }
@@ -828,10 +828,10 @@
         Iterator<Tuple> it = pigServer.openIterator("o");
 
         List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
+            Util.getTuplesFromConstantTupleStringAsByteArray(
                     new String[] {
-                            "(11,21,31,41,51)",
-                            "(10,20,30,40,50)",
+                            "('11','21','31','41','51')",
+                            "('10','20','30','40','50')",
                     });
         Util.checkQueryOutputs(it, expectedRes);
     }
@@ -858,10 +858,10 @@
         Iterator<Tuple> it = pigServer.openIterator("o");
 
         List<Tuple> expectedRes =
-            Util.getTuplesFromConstantTupleStrings(
+            Util.getTuplesFromConstantTupleStringAsByteArray(
                     new String[] {
-                            "(11,21,31,41,51)",
-                            "(10,20,30,40,50)",
+                            "('11','21','31','41','51')",
+                            "('10','20','30','40','50')",
                     });
         Util.checkQueryOutputs(it, expectedRes);
     }
diff --git a/test/org/apache/pig/test/TestTuple.java b/test/org/apache/pig/test/TestTuple.java
index 9dc32b4..680bdbf 100644
--- a/test/org/apache/pig/test/TestTuple.java
+++ b/test/org/apache/pig/test/TestTuple.java
@@ -70,6 +70,9 @@
             assertEquals(
                     "(12,[pig#scalability],,12,1.2,(innerTuple),{(innerTuple)})",
                     TupleFormat.format(tuple));
+
+            //check if TupleFormat can handle null tuple
+            assertEquals("()", TupleFormat.format(null));
         } catch (ExecException e) {
             e.printStackTrace();
             fail();
diff --git a/test/org/apache/pig/test/Util.java b/test/org/apache/pig/test/Util.java
index 8d4282d..d1c711b 100644
--- a/test/org/apache/pig/test/Util.java
+++ b/test/org/apache/pig/test/Util.java
@@ -535,16 +535,10 @@
      * @param actualResults Result of the executed Pig query
      * @param expectedResults Expected results List to validate against
      */
-     static public void checkQueryOutputs(Iterator<Tuple> actualResults,
+      static public void checkQueryOutputs(Iterator<Tuple> actualResults,
                                      List<Tuple> expectedResults) {
-         int count = 0;
-         for (Tuple expected : expectedResults) {
-             Tuple actual = actualResults.next();
-             count++;
-             Assert.assertEquals(expected.toString(), actual.toString());
-         }
-         Assert.assertEquals(expectedResults.size(), count);
-     }
+          checkQueryOutputs(actualResults, expectedResults.iterator(), null );
+      }
 
      /**
       * Helper function to check if the result of a Pig Query is in line with
@@ -560,7 +554,15 @@
               Tuple expected = expectedResults.next();
               Assert.assertTrue("Actual result has less records than expected results", actualResults.hasNext());
               Tuple actual = actualResults.next();
-              Assert.assertEquals(expected.toString(), actual.toString());
+
+              // If this tuple contains any bags, bags will be sorted before comparisons
+              if( !expected.equals(actual) ) {
+                  // Using string comparisons since error message is more readable
+                  // (only showing the part which differs)
+                  Assert.assertEquals(expected.toString(), actual.toString());
+                  // if above goes through, simply failing with object comparisons
+                  Assert.assertEquals(expected, actual);
+              }
               count++;
           }
           Assert.assertFalse("Actual result has more records than expected results", actualResults.hasNext());
@@ -613,9 +615,7 @@
          Collections.sort(actualResList);
          Collections.sort(expectedResList);
 
-         Assert.assertEquals("Comparing actual and expected results. ",
-                 expectedResList, actualResList);
-
+         checkQueryOutputs(actualResList.iterator(), expectedResList);
     }
 
     /**
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld
index 15d9716..9018a24 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld
@@ -14,7 +14,7 @@
 |   |
 |   Project[tuple][*] - scope-33
 |
-|---a: Load(file:///tmp/input1:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','','a_1-0','scope','false')) - scope-31
+|---a: Load(file:///tmp/input1:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','','a_1-0','scope','false')) - scope-31
 Tez vertex scope-32
 # Plan on vertex
 POValueOutputTez - scope-38	->	 [scope-22]
@@ -40,4 +40,4 @@
         |   |
         |   |---Project[bytearray][1] - scope-4
         |
-        |---a: Load(file:///tmp/input1:org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader) - scope-0
\ No newline at end of file
+        |---a: Load(file:///tmp/input1:org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader) - scope-0
diff --git a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld
index 5111019..9977eb4 100644
--- a/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld
+++ b/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld
@@ -14,7 +14,7 @@
 |   |
 |   Project[tuple][*] - scope-38
 |
-|---b: Load(file:///tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','eNq9V01sG0UUftk4ieukP2nTVgiFOjTApfKWn0NRiiAusTC4cVRboDqqlPF67Gy7uzOdnU3XPSB6gQMSXEACCSSEOPbEiSviwIUeQEJCnKre4QISCAmVN7O79tpxk6IEfFh7Zt6+3+9983z7F5jwBawy0SkQTqxNWuB2p9Ak1jXqtQqbpMUYL9CQWoG0mUe9ju2hyGbXty3iVEiXigJ3iOcX1uK9NVxB9BkzwGjAQVRUbS87TtnjgazABOMu4RJOV9CoGRk10ahpu9wx0YxjXgwcaV8kfCkU8MSQa0pKmyxUORVEMjFoMVuBnFsSzF1pdah/Hd6EMbTpvka7voTDlatki8RGUH8FptwKJVsUz46kziq2L/Ew41a5VjFegUn3EmNSrzIVOOTWWFsOm5nRu3WW3su66XXIMdsLo0JSdgtJ4HE4hqE0ZFzc0V7ot2eVm5H4K8TfxLOJqZ+//ubExvfjYJQg5zDSKhELE1OGA3JTUH+TOa2Qv/iS1jlzI4vPI+pnqH3KbjsYQyuLu6YdM5qq86tg2C1MtG8xTiUcjZKJ8h2zJoXtdZbCJKpFGYuhmct7AZ6gDlGHxEl8QhhWS0ysoL6+bzMNmLX9FSKcbp0K1/aIpK0GHLjmsRtezb5JGzCNiKzZLVr1nG4ZpjxWltT1K3ByALtrglnU9zEYCcdSARYZcyjxEDA5W8uphtBAWYc526+zIi2hp5J6tLUsBOlKMNYbCBfG9eEl6lMZy0+rHCMk24g8CdX14SaJE2RGCTKHEmQOJMhMejJJzxLqRzwgyOpdrjBvrBcR2T4qd4mEsyM70mGdvsZI1KzpLxWwDLhDL5JrVEjID7/fIpKY9Z6EaufKXgo+HM9AjSeIZQUu8o2uwbKUykarDHOCXg8wWNpaI4I4DnVs3y1Crp8IhC1xbKJLNoMLrUDCow8OByOf1FIJHxx00FvSoXVBLJWKp4bf5dRLqCUliHpmmbAxVAyRWRrNicopFsiUhRwngnoaWxLW9gQLTdhmmrDRj3GhAPHynhRf0ilVZV7YlT76tdMUh2wi4cmRAFSvmSnWQSZROTkIcB8/oYSxJueKGY/1mVG3mWLx8NYP8x9/Sz4dh7EyZHzsds1DYzcyMdFd2U9ALlaHirn83p3V2Z/+/siAcTSvQFKGSdZuY8MPoC7k9+OPWj4W6niKe3EtqkWKoYswI6gMhFeTRAZIbpNRBwySdbV5lVpYQm7EdG30rhx1Xoi57k5evPXjJ3/9itXDztsiTqDyqmM5pV7S6TX0Otqd3OGu2dMlgHaFImXmpS8BpGoVRz/8yQYcsv14G29kD2+AHNuiQl2X6jo4znvsXiSdalv3ud9Q9KGyVMN9h+pNPC5D1pdEyAvMwUa1mBO4HnbP8dQI0UOgaofmfke4sm2vHyqEKtUndMKn4+28WhyKOuYZrtolKRXoUkEYb51O6g79avabJQWEsidph4qj9z774o9b75wzVH/FQBBwpC+3GrhNKt6+/eH89Ad331Xe8f7sMaj99f3HwQXip0CQwdvBwp1VJlcpxbJXYLptU6dVi2/AZ4cJCLuIBcKikcBisiz1X0LqzLYDz6pxhbdHhhWU4iMlJihx1H3TmzWj6aEr6ZIuWWGgZAupkuVSBYvzpZZmuG1brxcSMkFuDNOVxSQ/PZTk3SNM5a8IGakvzOkW9S1hc1WO+NrMeMSl8e/eSHFq54Qiz+TwnwAJJOvgdCTUZJZv41SdX7M7ee1DPpJUipcVKZ4c8j9JcH9kxmH9gOUQ319NHFqHrJqHl0UH0XBsfftsWoHD0dSAEn17cyH+TRm+yZqBjTO6V0AHa6gTr3LspvBBo7Shts/qWqnHC+pxPhxVy/M7qjB7es5rPeG2/h3BsjA86v+r4xRHa7ujARgCfyiWV/tnBvD9+H5Qklo/p36FI4/ODVg8vXtHXX6Ijro50FHqx4WcepbUg+rZpLwLIhralnpooG2MRMTGjiou9/RsaD3/PyI2RicQEREGQv3DeeP9/PNzX125l5C+kSpg4iRKRn9y9m0GW/r8zInm753vvuyZ7bXNhvarePu3k39OZut3ewI5RMPIEsyHOyVgXo+hO73432QfMMX/ACAlM/I=','b_1-1','scope','true')) - scope-8
+|---b: Load(file:///tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','eNq9V01sG0UUftk4ieskbdK/CKG2bhvKBXlpewCUQxq3sTBs46iOQDit1Mnu2Nl2d2c7O5uue0D0AgckuIAEEkgIwa2nnhA3xIELPYCEhDhVPSLBBSQQEipvZnftteMmRQn4sPbMvH3vzXvf+97znV9gJOCwxHirRHxirtOSb7dKa8S8Tj2rtE4sxvwSjagZCpt51GvZHoqstwPbJI5B2pSXfId4QWk52VvGFcSfIQ20BkyiolpzwXGqnh8KA0aY7xJfwEkDjeqxUR2N6rbrOzqacfSLoSPsi8Sfizg81eealFImSzWfciIY77WYN6DgVjhzF60WDW7A6zCENt2XaTsQsM+4RjZIYgT1GzDmGpRsUDybypwZdiDwMOfWfKVi2IBR9xJjQq1yBux166wp+s1MqN0Vlt3Lu9l15GO0jw+6krRbSi+eXEfTpIacizvKC/X2tHQzFn+RBOt4NjL201dfH7763TBoFSg4jFgVYmJgqrBHrHMarDPHivz5c0rnxM08Pqfkz0j5lN90MIRWZrcNO0Y0k+eXQLMtDHRgMp8K2B8HE+Vbel1w22vNRemtZkUihmZe2wnwOHWIPCRO6hPCsFZhfBH1dX2bbMC0HSwS7rRXKHdtjwhqNWDPdY/d9Or2LdqAcURk3bZozXPaVRjzWFVQNzBgpge7y5yZNAjwMgIOZC5YZsyhxJtbhakmeiSotxS6FZs6FsJKW60iIm31viwUBaBVOGgHK6xMK7E8tRY4J20p3UAYMV8dXqIBFYn8uIw9QrWJiBRQW+0vniRwehw4vS9wek/g9LRW07Ch5+OIEwTfStunyukyIj5A5S4R8OzASnVYq6sxFtXr6gsrpyBC36EXyXXKBRT737eIIPpKR0KWubETIPTfp5v7iQaMENMMXeQhlYMFIaQNqwoHOb0R4mWptUw4cRzq2IFbhkI3EAhn4thEpWwCF0qBgCcffR28+aiSSnli0kFvSYuucGLKUDzd/65PvZRyMoKoZ5pxG6+KV2SmQnmqcoyFImOh4BNOPYUtAcs7goUicj1L5OjHMJeAuLAjxZdUSGWaj29LK93cKepDlhFwaiAA5Wt6ho2QYWRMJgEe4icSMLTm+5IxD3QZU5WZZPfo9vdHPvyGfDwMQ1XIBcgCip+GbuYSAryym4CcrfUlc+Gde0vTP/79gQbDaF6CpAqjrNnEgu9BXeQ/TD5yeSxS9ynvxLU4FxnmLsMEpyLkXl0QESLpjcYV0EvitbVr1MQU+lpC41qnFcnzUsKB94r8jR8++utXzB5W3gZxQhnXkEsavPjl1VMbP9/9XMPOI5VoiROgrlqUOyr6mlrHu6NbtKgd9Q50i0suZ162dyDDy2t2ozPagL12kGxjI/ewcRTYBuWyy8oucsjvNIUyadWaigaChmQXGcQ67jtUbeJxFfKBIFycZw7Wscmc0PWwuA5lJo8OQGW1rO32DRc37XWvCpEM9YwK+HiyXZSLvXFBnfFlNaWpApUqiJKt2RQWmWx2aymDk6onaIvy/Q8++eyP2289r8nyS3DCYaorhx10jfI377x/ZPy9+2+nkElGll7tr+w+Ds6TIAOCHDYPE3eWmFiiFNNuwHhTtvd60iDP9vMTFhkLuUljgdl0Wem+hMyab4aeWfcl3p7oV1BJjqQYp8SR7agzosZDR1vQOZWy0z0pO55JWSGTsCRecnkm2rSt1idSrkHqjLKZxSCf7gvy9jfMxK8MOaH66bhFA5PbvkxH0lVzHnFp8rszcRzbOqBIQwX8A0FCwVo4PHE50BWbOIwXl+1WUflQjCWl4guSM2f6/E8D3J20ccbfYzokCJZSh1YhL8foBd5CNBxY3TzSGrAvHipQomvvUIT/bvob3Vpo42jvldDBOurETo/VFD1qAtfk9lmVK/k4Jx/z0aBczm+p4kxHz7zSE22q3wEsC/3/EP7VcYajld3BAIzAfyyWl/t6D75P7AYlyfVz8lc08Giux+LJ7Svq8mNU1K2eipI/KgX5rMrHuhpdjG0QcUXZkg9LPsyBiDC3VHG5o8dUev5/RJiDA4iIiNSc0Hj13eILB7+48iAzJ2xyEiXj/0C7NqLNffrM4bXfW9/e7ZjtlI2p/Crf+W3mz9H8yv2OQAHRMDAFR6OtAnBUTalbvfjfRB8wxP8AYS1DMw==','b_1-1','scope','true')) - scope-8
 Tez vertex scope-37
 # Plan on vertex
 POValueOutputTez - scope-43	->	 [scope-29]
@@ -48,4 +48,4 @@
             |   |
             |   |---Project[bytearray][1] - scope-4
             |
-            |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
\ No newline at end of file
+            |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0