PIG-3916: isEmpty should not be early terminating (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.12@1592001 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 924c351..73d6a62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,13 @@
 
 Pig Change Log
 
+Release 0.12.2 (unreleased changes)
+
+BUG FIXES
+
+PIG-3916: isEmpty should not be early terminating (rohini)
+
+
 Release 0.12.1
 
 INCOMPATIBLE CHANGES
diff --git a/src/org/apache/pig/builtin/IsEmpty.java b/src/org/apache/pig/builtin/IsEmpty.java
index d13d500..ab73021 100644
--- a/src/org/apache/pig/builtin/IsEmpty.java
+++ b/src/org/apache/pig/builtin/IsEmpty.java
@@ -22,7 +22,6 @@
 
 import org.apache.pig.FilterFunc;
 import org.apache.pig.PigException;
-import org.apache.pig.TerminatingAccumulator;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
@@ -32,14 +31,12 @@
 /**
  * Determine whether a bag or map is empty.
  */
-public class IsEmpty extends FilterFunc implements TerminatingAccumulator<Boolean> {
-
-    private boolean isEmpty = true;
+public class IsEmpty extends FilterFunc {
 
     @Override
     public Boolean exec(Tuple input) throws IOException {
         try {
-            Object values = input.get(0);        
+            Object values = input.get(0);
             if (values instanceof DataBag)
                 return ((DataBag)values).size() == 0;
             else if (values instanceof Map)
@@ -55,24 +52,4 @@
         }
     }
 
-    @Override
-    public boolean isFinished() {
-        return !isEmpty;
-    }
-
-    @Override
-    public void accumulate(Tuple b) throws IOException {
-        isEmpty &= exec(b);
-    }
-
-    @Override
-    public void cleanup() {
-        isEmpty = true;
-    }
-
-    @Override
-    public Boolean getValue() {
-        return isEmpty;
-    }
-
 }
diff --git a/test/org/apache/pig/test/TestAccumulator.java b/test/org/apache/pig/test/TestAccumulator.java
index b979649..1cdb468 100644
--- a/test/org/apache/pig/test/TestAccumulator.java
+++ b/test/org/apache/pig/test/TestAccumulator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.test;
 
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
@@ -25,6 +26,8 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -52,7 +55,6 @@
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         // pigServer = new PigServer(ExecType.LOCAL);
         pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");
-        pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", "2");
         pigServer.getPigContext().getProperties().setProperty("pig.exec.nocombiner", "true");
         // reducing the number of retry attempts to speed up test completion
         pigServer.getPigContext().getProperties().setProperty("mapred.map.max.attempts","1");
@@ -411,6 +413,33 @@
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAccumWithIsEmpty() throws IOException{
+        pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "1");
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+        pigServer.registerQuery("C = cogroup A by id outer, B by id outer;");
+        pigServer.registerQuery("D = foreach C generate group," +
+                "(int)(IsEmpty(A) ? 0 : SUM(A.id)) as suma," +
+                "(int)(IsEmpty(B) ? 0 : SUM(B.id)) as sumb;");
+
+        List<Tuple> expected = new ArrayList<Tuple>();
+        expected.add(tuple(100, 200, 200));
+        expected.add(tuple(200, 200, 400));
+        expected.add(tuple(300, 900, 300));
+        expected.add(tuple(400, 400, 0));
+
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        List<Tuple> actual = new ArrayList<Tuple>();
+
+        while(iter.hasNext()) {
+            actual.add(iter.next());
+        }
+        Collections.sort(actual);
+        assertEquals(expected, actual);
+    }
+
     @Test
     public void testAccumWithDistinct() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
@@ -594,6 +623,7 @@
 
         checkAccumulatorOff("C");
         pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "true");
+
     }
 
     private void checkAccumulatorOff(String alias) {
@@ -691,4 +721,6 @@
                     });
         Util.checkQueryOutputsAfterSort(iter, expectedRes);
     }
-}
\ No newline at end of file
+
+
+}