PIG-5223: TestLimitVariable.testNestedLimitVariable1 and TestSecondarySortMR.testNestedLimitedSort failing

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1791451 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index a88e57c..174ae5c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -97,6 +97,8 @@
  
 BUG FIXES
 
+PIG-5223: TestLimitVariable.testNestedLimitVariable1 and TestSecondarySortMR.testNestedLimitedSort failing (jins via daijy)
+
 PIG-5209: Cross product on flatten(map) fails with ClassCastException (knoguchi)
 
 PIG-5153: Change of behavior in FLATTEN(map) (szita via rohini)
diff --git a/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java b/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java
index 6cc4cbf..6c10683 100644
--- a/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java
+++ b/src/org/apache/pig/newplan/logical/rules/NestedLimitOptimizer.java
@@ -65,8 +65,11 @@
                 Operator op = it.next();
                 if (op instanceof LOLimit) {
                     List<Operator> preds = innerPlan.getPredecessors(op);
+                    LOLimit limitOp = (LOLimit) op;
+                    boolean useExpressionPlan = limitOp.getLimit() == -1 &&
+                            limitOp.getLimitPlan() != null;
                     // Limit should always have exactly 1 predecessor
-                    if (preds.get(0) instanceof LOSort) {
+                    if (!useExpressionPlan && preds.get(0) instanceof LOSort) {
                         return true;
                     }
                 }
@@ -94,8 +97,11 @@
                 Operator op = it.next();
                 if (op instanceof LOLimit) {
                     List<Operator> preds = innerPlan.getPredecessors(op);
+                    LOLimit limitOp = (LOLimit) op;
+                    boolean useExpressionPlan = limitOp.getLimit() == -1 &&
+                            limitOp.getLimitPlan() != null;
                     // Limit should always have exactly 1 predecessor
-                    if (preds.get(0) instanceof LOSort) {
+                    if (!useExpressionPlan && preds.get(0) instanceof LOSort) {
                         limit = (LOLimit) op;
                         sort = (LOSort) (preds.get(0));
                         break;
diff --git a/test/org/apache/pig/test/TestOptimizeNestedLimit.java b/test/org/apache/pig/test/TestOptimizeNestedLimit.java
index 9faba62..5d2a89e 100644
--- a/test/org/apache/pig/test/TestOptimizeNestedLimit.java
+++ b/test/org/apache/pig/test/TestOptimizeNestedLimit.java
@@ -58,6 +58,7 @@
 public class TestOptimizeNestedLimit {
 
     LogicalPlan lp;
+    LogicalPlan lp1;
     PhysicalPlan php;
     MROperPlan mrp;
 
@@ -76,6 +77,18 @@
         lp = optimizePlan(Util.buildLp(pigServer, query));
         php = Util.buildPp(pigServer, query);
         mrp = Util.buildMRPlanWithOptimizer(php, pc);
+
+        query = "a = load 'myfile' as (id:int, num:int);" +
+                "b = group a by num;" +
+                "c = foreach b generate COUNT(a) as ntup;" +
+                "d = group c all;" +
+                "e = foreach d generate MIN(c.ntup) AS min;" +
+                "f = foreach b {" +
+                "g = order a by id ASC;" +
+                "h = limit g e.min;" +
+                "generate FLATTEN(h);}" +
+                "store f into 'empty';";
+        lp1 = optimizePlan(Util.buildLp(pigServer, query));
     }
 
     @After
@@ -115,6 +128,43 @@
     }
 
     @Test
+    // Test logical plan not being optimized with LOLimit.mLimit = -1
+    public void testLogicalPlan1() throws Exception {
+
+        Iterator<Operator> it = lp1.getOperators();
+
+        LOForEach forEach = null;
+        LOSort sort = null;
+        LOLimit limit = null;
+
+        while(it.hasNext()) {
+            Operator op = it.next();
+            if (op instanceof LOForEach) {
+                forEach = (LOForEach) op;
+
+                Iterator<Operator> innerIt = forEach.getInnerPlan().getOperators();
+
+                while(innerIt.hasNext()) {
+                    Operator innerOp = innerIt.next();
+                    if (innerOp instanceof LOSort) {
+                        assertNull("There should be only one LOSort", sort);
+                        sort = (LOSort) innerOp;
+                    } else if (innerOp instanceof LOLimit) {
+                        assertNull("There should be only one LOLimit", limit);
+                        limit = (LOLimit) innerOp;
+                    }
+                }
+            }
+        }
+
+        assertNotNull("ForEach is missing", forEach);
+        assertNotNull("LOSort is missing", sort);
+        assertNotNull("LOLimit is missing", limit);
+        assertEquals(sort.getLimit(), -1);
+        assertEquals(limit.getLimit(), -1);
+    }
+
+    @Test
     // test physical plan
     public void testPhysicalPlan() throws Exception {
 
diff --git a/test/org/apache/pig/test/TestSecondarySort.java b/test/org/apache/pig/test/TestSecondarySort.java
index e347618..4bc3f65 100644
--- a/test/org/apache/pig/test/TestSecondarySort.java
+++ b/test/org/apache/pig/test/TestSecondarySort.java
@@ -552,7 +552,7 @@
                        "c1 = order a by age;" +
                        "c2 = limit c1 5;" +
                        "generate c2;}" +
-                       "store c in 'empty';";
+                       "store c into 'empty';";
 
         SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);