PIG-5055: Infinite loop with join by fixed index (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1769172 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8334e44..fe9bf12 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@
  
 BUG FIXES
 
+PIG-5055: Infinite loop with join by fixed index (knoguchi)
+
 PIG-5049: Cleanup e2e tests turing_jython.conf (Daniel Dai)
 
 PIG-5033: MultiQueryOptimizerTez creates bad plan with union, split and FRJoin (rohini,tmwoordruff via rohini)
diff --git a/src/org/apache/pig/newplan/logical/rules/AddForEach.java b/src/org/apache/pig/newplan/logical/rules/AddForEach.java
index 01f921a..463a3ad 100644
--- a/src/org/apache/pig/newplan/logical/rules/AddForEach.java
+++ b/src/org/apache/pig/newplan/logical/rules/AddForEach.java
@@ -95,7 +95,7 @@
             }
             
             Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
-            if (outputUids==null)
+            if (outputUids==null || outputUids.size() == 0 )
                 return false;
             
             LogicalSchema schema = op.getSchema();
diff --git a/test/org/apache/pig/test/TestNewPlanColumnPrune.java b/test/org/apache/pig/test/TestNewPlanColumnPrune.java
index b2c5ebd..67a02af 100644
--- a/test/org/apache/pig/test/TestNewPlanColumnPrune.java
+++ b/test/org/apache/pig/test/TestNewPlanColumnPrune.java
@@ -454,6 +454,34 @@
         }
     }
 
+    @Test
+    public void testNoAddForeach() throws Exception  {
+        // PIG-5055
+        // Need to make sure that it does not add foreach
+        // that drops all the fields from B2.
+        String query = "A = load 'd.txt' as (a0:int, a1:int, a2:int);" +
+        "B = load 'd.txt' as (b0:int, b1:int, b2:int);" +
+        "B2 = FILTER B by b0 == 0;" +
+        "C = join A by (1), B2 by (1) ;" +
+        "D = FOREACH C GENERATE A::a1, A::a2;" +
+        "store D into 'empty';";
+
+        LogicalPlan newLogicalPlan = buildPlan(query);
+
+        PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+        optimizer.optimize();
+        System.err.println(newLogicalPlan);
+        Iterator<Operator> iter = newLogicalPlan.getOperators();
+        while (iter.hasNext()) {
+            Operator o = iter.next();
+            LogicalRelationalOperator lro = (LogicalRelationalOperator)o;
+            if (lro == null || lro.getAlias() == null) continue;
+            if (lro.getAlias().equals("B2")) {
+                assertNotNull(lro.getSchema());
+            }
+        }
+    }
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
 
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {