PIG-5224: Extra foreach from ColumnPrune preventing Accumulator usage (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1796190 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index c8d1947..66741bd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -101,6 +101,8 @@
  
 BUG FIXES
 
+PIG-5224: Extra foreach from ColumnPrune preventing Accumulator usage (knoguchi)
+
 PIG-5235: Typecast with as-clause fails for tuple/bag with an empty schema (knoguchi)
 
 PIG-5238: Fix datetime related test issues after PIG-4748 (szita)
diff --git a/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java b/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
index a458173..7d344bd 100644
--- a/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
+++ b/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
@@ -267,6 +267,14 @@
 
     @Override
     public void visit( LOCogroup cg ) throws FrontendException {
+        LogicalPlan p = (LogicalPlan) cg.getPlan();
+        List<Operator> successors = p.getSuccessors(cg);
+        // if there is already a LOForEach after this operator,
+        // skip adding another foreach since it can conflict with
+        // AccumulatorOptimizerUtil.addAccumulator()
+        if (successors.size() == 1 && successors.get(0) instanceof LOForEach) {
+            return;
+        }
         addForEachIfNecessary(cg);
     }
 
diff --git a/test/org/apache/pig/test/TestAccumulator.java b/test/org/apache/pig/test/TestAccumulator.java
index fc5d75c..a5994de 100644
--- a/test/org/apache/pig/test/TestAccumulator.java
+++ b/test/org/apache/pig/test/TestAccumulator.java
@@ -739,12 +739,13 @@
     public void testAccumAfterNestedOp() throws IOException, ParserException{
         // test group by
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
-        pigServer.registerQuery("B = group A by id;");
-        pigServer.registerQuery("C = foreach B " +
-                        "{ o = order A by id; " +
+        pigServer.registerQuery("B = foreach A generate id;"); //additional test to enable columnprune(PIG-5224)
+        pigServer.registerQuery("C = group B by id;");
+        pigServer.registerQuery("D = foreach C " +
+                        "{ o = order B by id; " +
                         "  generate org.apache.pig.test.utils.AccumulatorBagCount(o);}; ");
 
-        Iterator<Tuple> iter = pigServer.openIterator("C");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
         List<Tuple> expectedRes =
             Util.getTuplesFromConstantTupleStrings(
                     new String[] {