PIG-3916: isEmpty should not be early terminating (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/branches/branch-0.12@1592001 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 924c351..73d6a62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,13 @@
Pig Change Log
+Release 0.12.2 (unreleased changes)
+
+BUG FIXES
+
+PIG-3916: isEmpty should not be early terminating (rohini)
+
+
Release 0.12.1
INCOMPATIBLE CHANGES
diff --git a/src/org/apache/pig/builtin/IsEmpty.java b/src/org/apache/pig/builtin/IsEmpty.java
index d13d500..ab73021 100644
--- a/src/org/apache/pig/builtin/IsEmpty.java
+++ b/src/org/apache/pig/builtin/IsEmpty.java
@@ -22,7 +22,6 @@
import org.apache.pig.FilterFunc;
import org.apache.pig.PigException;
-import org.apache.pig.TerminatingAccumulator;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
@@ -32,14 +31,12 @@
/**
* Determine whether a bag or map is empty.
*/
-public class IsEmpty extends FilterFunc implements TerminatingAccumulator<Boolean> {
-
- private boolean isEmpty = true;
+public class IsEmpty extends FilterFunc {
@Override
public Boolean exec(Tuple input) throws IOException {
try {
- Object values = input.get(0);
+ Object values = input.get(0);
if (values instanceof DataBag)
return ((DataBag)values).size() == 0;
else if (values instanceof Map)
@@ -55,24 +52,4 @@
}
}
- @Override
- public boolean isFinished() {
- return !isEmpty;
- }
-
- @Override
- public void accumulate(Tuple b) throws IOException {
- isEmpty &= exec(b);
- }
-
- @Override
- public void cleanup() {
- isEmpty = true;
- }
-
- @Override
- public Boolean getValue() {
- return isEmpty;
- }
-
}
diff --git a/test/org/apache/pig/test/TestAccumulator.java b/test/org/apache/pig/test/TestAccumulator.java
index b979649..1cdb468 100644
--- a/test/org/apache/pig/test/TestAccumulator.java
+++ b/test/org/apache/pig/test/TestAccumulator.java
@@ -17,6 +17,7 @@
*/
package org.apache.pig.test;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -25,6 +26,8 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -52,7 +55,6 @@
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
// pigServer = new PigServer(ExecType.LOCAL);
pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");
- pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", "2");
pigServer.getPigContext().getProperties().setProperty("pig.exec.nocombiner", "true");
// reducing the number of retry attempts to speed up test completion
pigServer.getPigContext().getProperties().setProperty("mapred.map.max.attempts","1");
@@ -411,6 +413,33 @@
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAccumWithIsEmpty() throws IOException{
+ pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "1");
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+ pigServer.registerQuery("C = cogroup A by id outer, B by id outer;");
+ pigServer.registerQuery("D = foreach C generate group," +
+ "(int)(IsEmpty(A) ? 0 : SUM(A.id)) as suma," +
+ "(int)(IsEmpty(B) ? 0 : SUM(B.id)) as sumb;");
+
+ List<Tuple> expected = new ArrayList<Tuple>();
+ expected.add(tuple(100, 200, 200));
+ expected.add(tuple(200, 200, 400));
+ expected.add(tuple(300, 900, 300));
+ expected.add(tuple(400, 400, 0));
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ List<Tuple> actual = new ArrayList<Tuple>();
+
+ while(iter.hasNext()) {
+ actual.add(iter.next());
+ }
+ Collections.sort(actual);
+ assertEquals(expected, actual);
+ }
+
@Test
public void testAccumWithDistinct() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
@@ -594,6 +623,7 @@
checkAccumulatorOff("C");
pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "true");
+
}
private void checkAccumulatorOff(String alias) {
@@ -691,4 +721,6 @@
});
Util.checkQueryOutputsAfterSort(iter, expectedRes);
}
-}
\ No newline at end of file
+
+
+}