PIG-3021: Split results missing records when there is null values in the column comparison

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1796133 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 4790219..05c2591 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -36,6 +36,8 @@
  
 IMPROVEMENTS
 
+PIG-3021: Split results missing records when there is null values in the column comparison (jeffjee617, cheolsoo via daijy)
+
 PIG-5211: Optimize Nested Limited Sort (jins via daijy)
 
 PIG-5214: search any substring in the input string (rainer-46 via daijy)
diff --git a/build.xml b/build.xml
index e70aa99..307b863 100644
--- a/build.xml
+++ b/build.xml
@@ -312,7 +312,7 @@
     </target>
 
     <target name="eclipse-files"
-            depends="init,ant-eclipse-download,ivy-compile,ivy-test"
+            depends="compile-test,ant-eclipse-download"
             description="Create eclipse project files">
         <pathconvert property="eclipse.project">
             <path path="${basedir}"/>
@@ -336,7 +336,7 @@
                 <source path="${src.shims.dir}"/>
                 <source path="${src.shims.test.dir}"/>
                 <source path="tutorial/src"/>
-                <source path="${test.src.dir}" excluding="e2e/pig/udfs/java/|resources/"/>
+                <source path="${test.src.dir}" excluding="e2e/pig/udfs/java/|resources/|perf/"/>
                 <output path="${build.dir.eclipse-main-classes}" />
                 <library pathref="eclipse.classpath" exported="true" />
                 <!--library pathref="classpath" exported="false"/-->
@@ -391,7 +391,6 @@
         <mkdir dir="${test.build.classes}" />
         <mkdir dir="${src.gen.dot.parser.dir}" />
         <mkdir dir="${src.gen.textdata.parser.dir}" />
-        <move file="${ivy.lib.dir}/javacc-${javacc.version}.jar" tofile="${javacc.home}/javacc.jar"/>
         <tstamp>
             <format property="timestamp" pattern="MMM dd yyyy, HH:mm:ss" />
         </tstamp>
@@ -474,11 +473,13 @@
     <!-- Java Compiler Compiler, generate Parsers                           -->
     <!-- ================================================================== -->
     <target name="cc-compile" depends="init, ivy-compile" description="Create and Compile Parser">
+        <move file="${ivy.lib.dir}/javacc-${javacc.version}.jar" tofile="${javacc.home}/javacc.jar"/>
         <javacc target="${src.dir}/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj" outputdirectory="${src.gen.script.parser.dir}" javacchome="${javacc.home}" />
         <javacc target="${src.dir}/org/apache/pig/tools/parameters/PigFileParser.jj" outputdirectory="${src.gen.param.parser.dir}" javacchome="${javacc.home}" />
         <javacc target="${src.dir}/org/apache/pig/tools/parameters/ParamLoader.jj" outputdirectory="${src.gen.param.parser.dir}" javacchome="${javacc.home}" />
         <jjtree target="${test.src.dir}/org/apache/pig/test/utils/dotGraph/DOTParser.jjt" outputdirectory="${src.gen.dot.parser.dir}" javacchome="${javacc.home}" />
         <javacc target="${src.gen.dot.parser.dir}/DOTParser.jj" outputdirectory="${src.gen.dot.parser.dir}" javacchome="${javacc.home}" />
+        <move file="${javacc.home}/javacc.jar" tofile="${ivy.lib.dir}/javacc-${javacc.version}.jar"/>
     </target>
 
     <target name="gen" depends="genTreeParser"
diff --git a/src/org/apache/pig/parser/AstPrinter.g b/src/org/apache/pig/parser/AstPrinter.g
index 792f6ec..c0a90fe 100644
--- a/src/org/apache/pig/parser/AstPrinter.g
+++ b/src/org/apache/pig/parser/AstPrinter.g
@@ -601,7 +601,7 @@
 ;
 
 split_otherwise
-    : ^( OTHERWISE alias { sb.append(" " + $OTHERWISE.text); } )
+    : ^( OTHERWISE alias { sb.append(" " + $OTHERWISE.text); } ( ALL { sb.append(" " + $ALL.text); } )? )
 ;
 
 col_ref : alias_col_ref | dollar_col_ref
diff --git a/src/org/apache/pig/parser/AstValidator.g b/src/org/apache/pig/parser/AstValidator.g
index 6661871..0090ac8 100644
--- a/src/org/apache/pig/parser/AstValidator.g
+++ b/src/org/apache/pig/parser/AstValidator.g
@@ -611,7 +611,7 @@
    }
 ;
 
-split_otherwise : ^( OTHERWISE alias )
+split_otherwise : ^( OTHERWISE alias ALL? )
    {
        aliases.add( $alias.name );
    }
diff --git a/src/org/apache/pig/parser/LogicalPlanBuilder.java b/src/org/apache/pig/parser/LogicalPlanBuilder.java
index 2310c7e..e241505 100644
--- a/src/org/apache/pig/parser/LogicalPlanBuilder.java
+++ b/src/org/apache/pig/parser/LogicalPlanBuilder.java
@@ -64,6 +64,7 @@
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.expression.BinCondExpression;
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
 import org.apache.pig.newplan.logical.expression.LessThanExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
@@ -276,7 +277,7 @@
         return buildOp ( loc, op, alias, inputAlias, null );
     }
 
-    String buildSplitOtherwiseOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias)
+    String buildSplitOtherwiseOp(SourceLocation loc, LOSplitOutput op, String alias, String inputAlias, boolean allowNulls)
             throws ParserValidationException, PlanGenerationFailureException {
         LogicalExpressionPlan splitPlan = new LogicalExpressionPlan();
         Operator losplit = lookupOperator(inputAlias);
@@ -314,7 +315,12 @@
         }
         // using De Morgan's law (!A && !B) == !(A || B)
         currentExpr = new NotExpression(splitPlan, currentExpr);
-
+        if (allowNulls) {
+            // add IsNull expression to the condition: i.e. (A || B) IS NULL OR !(A || B).
+            // this is needed to catch null values in otherwise branch.
+            LogicalExpression isNull = new IsNullExpression( splitPlan, currentExpr );
+            currentExpr = new OrExpression(splitPlan, isNull, currentExpr);
+        }
         try {
             // Going through all the ProjectExpressions that were cloned
             // and updating the attached operators from its original
diff --git a/src/org/apache/pig/parser/LogicalPlanGenerator.g b/src/org/apache/pig/parser/LogicalPlanGenerator.g
index 71b95ae..f6b4ab7 100644
--- a/src/org/apache/pig/parser/LogicalPlanGenerator.g
+++ b/src/org/apache/pig/parser/LogicalPlanGenerator.g
@@ -1716,13 +1716,14 @@
 split_otherwise throws PlanGenerationFailureException
 scope GScope;
 @init {
+	boolean allowNulls = false;
     $GScope::currentOp = builder.createSplitOutputOp();
 }
- : ^( OTHERWISE alias )
+ : ^( OTHERWISE alias ( ALL { allowNulls = true; } )? )
   {
        SourceLocation loc = new SourceLocation( (PigParserNode)$alias.start );
        builder.buildSplitOtherwiseOp( loc, (LOSplitOutput)$GScope::currentOp, $alias.name,
-           $statement::inputAlias);
+           $statement::inputAlias, allowNulls);
   }
 ;
 
diff --git a/src/org/apache/pig/parser/QueryParser.g b/src/org/apache/pig/parser/QueryParser.g
index d7bc019..556b1d4 100644
--- a/src/org/apache/pig/parser/QueryParser.g
+++ b/src/org/apache/pig/parser/QueryParser.g
@@ -496,7 +496,7 @@
 split_branch : identifier_plus IF cond -> ^( SPLIT_BRANCH identifier_plus cond )
 ;
 
-split_otherwise : identifier_plus OTHERWISE^
+split_otherwise : identifier_plus OTHERWISE ALL? -> ^( OTHERWISE identifier_plus ALL? )
 ;
 
 split_branches : COMMA! split_branch split_branches?
diff --git a/test/org/apache/pig/test/TestSplit.java b/test/org/apache/pig/test/TestSplit.java
index 1d108d0..4b0d810 100644
--- a/test/org/apache/pig/test/TestSplit.java
+++ b/test/org/apache/pig/test/TestSplit.java
@@ -17,12 +17,17 @@
  */
 package org.apache.pig.test;
 
-import java.io.File;
+import static org.junit.Assert.assertEquals;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.junit.Before;
@@ -30,46 +35,77 @@
 import org.junit.Test;
 
 public class TestSplit {
-    private static final String[] data = new String[] { "1", "2", "3", "4", "5", "6" };
-    private static File file;
-    private PigServer pig;
+    static private PigServer pig;
+    static private Data data;
 
     @BeforeClass
-    public static void oneTimeSetUp() throws IOException {
-        file = Util.createLocalInputFile("split_input", data);
+    public static void setup() throws ExecException, IOException {
+        pig = new PigServer(ExecType.LOCAL);
     }
 
     @Before
     public void setUp() throws Exception {
-        pig = new PigServer(Util.getLocalTestMode());
+        data = resetData(pig);
+        data.set("input",
+                tuple(1),
+                tuple(2),
+                tuple(3),
+                tuple(4),
+                tuple((Integer)null),
+                tuple(5),
+                tuple(6)
+        );
     }
 
     @Test
     public void testSplit1() throws IOException {
         String query = 
-            "a = load '" + Util.encodeEscape(file.getAbsolutePath()) + "' as (id:int);" + 
-            "split a into b if id > 3, c if id < 3, d otherwise;"
+            "a = load 'input' using mock.Storage() as (id:int);" +
+            "split a into b if id > 3, c if id < 3, d otherwise;" +
+            "d_sorted = order d by id;" +
+            "store d_sorted into 'output' using mock.Storage();"
             ;
 
         Util.registerMultiLineQuery(pig, query);
-        Iterator<Tuple> it = pig.openIterator("d");
-
-        List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] { "(3)" });
-        Util.checkQueryOutputs(it, expectedRes);
+        List<Tuple> out = data.get("output");
+        assertEquals(1, out.size());
+        assertEquals(tuple(3), out.get(0));
     }
     
     @Test
     public void testSplit2() throws IOException {
         String query = 
-            "a = load '" + Util.encodeEscape(file.getAbsolutePath()) + "' as (id:int);" + 
-            "split a into b if id % 2 == 0, d otherwise;"
+            "a = load 'input' using mock.Storage() as (id:int);" +
+            "split a into b if id % 2 == 0, d otherwise;" +
+            "d_sorted = order d by id;" +
+            "store d_sorted into 'output' using mock.Storage();"
             ;
 
-        Util.registerMultiLineQuery(pig, query);
-        Iterator<Tuple> it = pig.openIterator("d");
+         Util.registerMultiLineQuery(pig, query);
+         List<Tuple> out = data.get("output");
+         assertEquals(3, out.size());
+         assertEquals(tuple(1), out.get(0));
+         assertEquals(tuple(3), out.get(1));
+         assertEquals(tuple(5), out.get(2));
+    }
 
-        List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] { "(1)", "(3)", "(5)" });
-        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    @Test
+    public void testOtherwiseAll() throws IOException {
+    String query =
+        "a = load 'input' using mock.Storage() as (id:int);" +
+        "split a into b if id % 2 == 0, d otherwise all;" +
+        "d_sorted = order d by id;" +
+        "store d_sorted into 'output' using mock.Storage();"
+        ;
+        Util.registerMultiLineQuery(pig, query);
+        List<Tuple> out = data.get("output");
+        // Null should be passed through since the ALL keyword was specified in
+        // the otherwise branch
+        assertEquals(4, out.size());
+        assertEquals(tuple((Integer)null), out.get(0));
+        assertEquals(tuple(1), out.get(1));
+        assertEquals(tuple(3), out.get(2));
+        assertEquals(tuple(5), out.get(3));
     }
     
     @Test
@@ -77,26 +113,33 @@
         String query =
             "define split_into_two (A,key) returns B, C {" +
             "    SPLIT $A INTO $B IF $key<4, $C OTHERWISE;" +
-            "};"  +
-            "a = load '" + Util.encodeEscape(file.getAbsolutePath()) + "' as (id:int);" +
-            "B, C = split_into_two(a, id);"
+            "};" +
+            "a = load 'input' using mock.Storage() as (id:int);" +
+            "b, c = split_into_two(a, id);" +
+            "b_sorted = order b by id;" +
+            "c_sorted = order c by id;" +
+            "store b_sorted into 'output1' using mock.Storage();" +
+            "store c_sorted into 'output2' using mock.Storage();"
             ;
 
         Util.registerMultiLineQuery(pig, query);
-        Iterator<Tuple> it = pig.openIterator("B");
+        List<Tuple> out = data.get("output1");
+        assertEquals(3, out.size());
+        assertEquals(tuple(1), out.get(0));
+        assertEquals(tuple(2), out.get(1));
+        assertEquals(tuple(3), out.get(2));
 
-        List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] { "(1)", "(2)", "(3)" });
-        Util.checkQueryOutputsAfterSort(it, expectedRes);
-
-        it = pig.openIterator("C");
-        expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] { "(4)", "(5)", "(6)" });
-        Util.checkQueryOutputsAfterSort(it, expectedRes);
+        out = data.get("output2");
+        assertEquals(3, out.size());
+        assertEquals(tuple(4), out.get(0));
+        assertEquals(tuple(5), out.get(1));
+        assertEquals(tuple(6), out.get(2));
     }
 
     @Test(expected=FrontendException.class)
     public void testSplitNondeterministic() throws IOException {
         String query = 
-            "a = load '" + Util.encodeEscape(file.getAbsolutePath()) + "' as (id:int);" + 
+            "a = load 'input' using mock.Storage() as (id:int);" +
             "split a into b if RANDOM() < 0.5, d otherwise;"
             ;