/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.pig.test;

import static org.junit.Assert.assertEquals;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.ParserTestingUtils;
import org.apache.pig.test.utils.NewLogicalPlanUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestProjectRange  {

    protected final Log log = LogFactory.getLog(getClass());


    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
    protected static PigServer pigServer;
    private static final String INP_FILE_5FIELDS = "TestProjectRange_5fields";


    @BeforeClass
    public static void oneTimeSetUp() throws Exception {
        String[] input = {"10\t20\t30\t40\t50", "11\t21\t31\t41\t51"};
        Util.createInputFile(cluster, INP_FILE_5FIELDS, input);
    }

    @Before
    public void setup() throws ExecException {
        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
    }

    @After
    public void tearDown() throws Exception {
        pigServer.shutdown();
    }

    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        new File(INP_FILE_5FIELDS).delete();
        if(cluster != null)
            cluster.shutDown();
    }


    /**
     * Test project-range in foreach with limits on both sides
     * @throws IOException
     * @throws ParserException
     */

    @Test
    public void testFullRangeForeachWSchema() throws IOException, ParserException {

        String query;

        //specifying the new aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate a .. c as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");

        //specifying the new aliases - refer to column by pos
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate $0 .. $2 as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");

        //column with pos , name
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate $0 .. c as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");


        //specifying the new aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate b .. d as (bb, cc, dd);"
            ;
        compileAndCompareSchema("bb : float, cc : int, dd : int", query, "f");

        //begin, end of range is same
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate b .. b as (bb), $2 .. $2;"
            ;
        compileAndCompareSchema("bb : float, c : int", query, "f");

        // without aliases - two projections
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int, e : int);"
            + "f = foreach l1 generate a .. c, d .. e ;"
            ;
        compileAndCompareSchema("a : int, b : int, c : int, d : int, e : int", query, "f");

        // without aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : int, e : int);"
            + "f = foreach l1 generate a .. c ;"
            ;
        compileAndCompareSchema("a : int, b : int, c : int", query, "f");
        Iterator<Tuple> it = pigServer.openIterator("f");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(10,20,30)",
                            "(11,21,31)",
                    });
        Util.checkQueryOutputsAfterSort(it, expectedRes);

    }


    private void compileAndCompareSchema(String expectedSchStr, String query, String alias)
    throws IOException, ParserException {

        Schema expectedSch = null;

        if(expectedSchStr != null)
            expectedSch = Utils.getSchemaFromString(expectedSchStr);

        compileAndCompareSchema(expectedSch, query, alias);

    }

    private void compileAndCompareSchema(Schema expectedSch, String query,
            String alias) throws IOException {
        Util.registerMultiLineQuery(pigServer, query);

        Schema sch = pigServer.dumpSchema(alias);
        assertEquals("Checking expected schema", expectedSch, sch);
    }

    /**
     * Test project-range in foreach with starting limit
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testEndRangeForeachWSchema() throws IOException, ParserException {

        //specifying the new aliases
        String query;
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  .. c as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");

        //col position
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  .. $2 as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");

        //end is the beginning!
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  .. $0 as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int", query, "f");


        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  .. c as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int, bb : float, cc : int", query, "f");

        // without aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : long, e : int);"
            + "f = foreach l1 generate  .. $3 ;"
            ;
        compileAndCompareSchema("a : int, b : int, c : int, d : long", query, "f");
        Iterator<Tuple> it = pigServer.openIterator("f");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(10,20,30,40L)",
                            "(11,21,31,41L)",
                    });
        Util.checkQueryOutputsAfterSort(it, expectedRes);

    }


    /**
     * Test project-range in foreach with start limit
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testStartRangeForeachWSchema() throws IOException, ParserException {

        //specifying the new aliases
        String query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' " +
            		"as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  c ..  as (aa, bb, cc);"
            ;
        compileAndCompareSchema("aa : int, bb : int, cc : int", query, "f");

        // without aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' " +
                        "as (a : int, b : int, c : int, d : long, e : int);"
            + "f = foreach l1 generate  $1 ..  ;"
            ;
        compileAndCompareSchema("b : int, c : int, d : long, e : int", query, "f");

        //start with last column - beginning is the end!
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' " +
                        "as (a : int, b : int, c : int, d : long, e : int);"
            + "f = foreach l1 generate  e ..  ;"
            ;
        compileAndCompareSchema("e : int", query, "f");

        //specifying the new aliases for one
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' " +
            		"as (a : int, b : long, c : int, d : int, e : int);"
            + "f = foreach l1 generate  c ..  as (aa, bb, cc), b .. ;"
            ;
        compileAndCompareSchema(
                "aa : int, bb : int, cc : int, b : long, c : int, d : int, e : int",
                query,
                "f"
        );

        Iterator<Tuple> it = pigServer.openIterator("f");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(30,40,50,20L,30,40,50)",
                            "(31,41,51,21L,31,41,51)",

                    });
        Util.checkQueryOutputsAfterSort(it, expectedRes);

    }


    /**
     * Test multiple different types of range-project with foreach
     * @throws IOException
     * @throws ParserException
     */
   @Test
    public void testMixRangeForeachWSchema() throws IOException, ParserException {

        //specifying the new aliases
        String query;
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  .. b, c .. d, d .. as (aa, bb);"
            ;
        compileAndCompareSchema("a : int, b : float, c : int, d : int, aa : int, bb : int", query, "f");


        // without aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : int, c : int, d : long, e : int);"
            + "f = foreach l1 generate ..$0 as (first), e.. as (last), d ..,  .. b ;"
            ;
        compileAndCompareSchema("first : int, last : int, d : long, e : int, a : int, b : int", query, "f");
        Iterator<Tuple> it = pigServer.openIterator("f");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(10,50,40L,50,10,20)",
                            "(11,51,41L,51,11,21)",
                    });
        Util.checkQueryOutputsAfterSort(it, expectedRes);

    }

    /**
     * -ve test cases
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testNegativeForeachWSchema() throws IOException, ParserException {
        String query;
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  $3 .. $1;"
            ;
        Util.checkExceptionMessage(query, "f",
                "start column appears after end column in range projection");

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' as (a : int, b : float, c : int, d : int, e : int);"
            + "f = foreach l1 generate  c .. b;"
            ;
        Util.checkExceptionMessage(query, "f",
                "start column appears after end column in range projection");
    }


    /**
     * -ve test cases
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testNegativeForeachNOSchema() throws IOException, ParserException {
        String query;
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + "f = foreach l1 generate  $3 .. $1;"
            ;
        Util.checkExceptionMessage(query, "f",
                "start column appears after end column in range projection");

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "' ;"
            + "f = foreach l1 generate  a .. b;"
            ;
        Util.checkExceptionMessage(query, "f",
                "Invalid field projection. Projected field [a] does not exist.");
    }

    /**
     * Test foreach without schema
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testStartRangeForeachNOSchema() throws IOException, ParserException {

        String query;

        // without aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + "f = foreach l1 generate ..$3  as (a,b,c,d);"
            ;
        compileAndCompareSchema("a : bytearray,b : bytearray,c : bytearray,d : bytearray", query, "f");


        Util.registerMultiLineQuery(pigServer, query);

        pigServer.explain("f", System.err);
        Iterator<Tuple> it = pigServer.openIterator("f");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStringAsByteArray(
                    new String[] {
                            "('10','20','30','40')",
                            "('11','21','31','41')",
                    });
        Util.checkQueryOutputsAfterSort(it, expectedRes);

    }


    /**
     * Test foreach without schema
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testMixRangeForeachNOSchema() throws IOException, ParserException {

        String query;

        // without aliases
        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + "f = foreach l1 generate ..$0 as (first), $4.. as (last), $3 ..,  .. $1 ;"
            ;
        compileAndCompareSchema((Schema)null, query, "f");


        Util.registerMultiLineQuery(pigServer, query);

        pigServer.explain("f", System.err);
        Iterator<Tuple> it = pigServer.openIterator("f");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStringAsByteArray(
                    new String[] {
                            "('10','50','40','50','10','20')",
                            "('11','51','41','51','11','21')",
                    });
        Util.checkQueryOutputsAfterSort(it, expectedRes);

    }

    /**
     * Test foreach without schema - with some operations after the foreach
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testRangeForeachWFilterNOSchema() throws IOException, ParserException {
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + "f = foreach l1 generate ..$0 as (first), $4.. as (last), $3 ..,  .. $1 ;"
            + " fil = filter f by $0 > 10;"
            ;

        Util.registerMultiLineQuery(pigServer, query);

        pigServer.explain("fil", System.err);
        Iterator<Tuple> it = pigServer.openIterator("fil");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStringAsByteArray(
                    new String[] {
                            "('11','51','41','51','11','21')",
                    });
        Util.checkQueryOutputsAfterSort(it, expectedRes);

    }

    @Test
    public void testRangeOrderByWSchema() throws IOException, ParserException{
        String query;

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                        "' as (a : int, b : long, c : int, d : int, e : int);"
                + " o = order l1 by  .. $2 DESC ;"
                ;
            compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {false,false,false};
            checkNumExpressionPlansForSort(lp, 3, isAsc);
        }

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                        "' as (a : int, b : long, c : int, d : int, e : int);"
                + " o = order l1 by  $3 ..  ;"
                ;
            compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {true, true};
            checkNumExpressionPlansForSort(lp, 2, isAsc);
        }

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                        "' as (a : int, b : long, c : int, d : int, e : int);"
                + " o = order l1 by  d .. DESC  ;"
                ;
            compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {false, false};
            checkNumExpressionPlansForSort(lp, 2, isAsc);
        }

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                        "' as (a : int, b : long, c : int, d : int, e : int);"
                + " f = foreach l1 generate *;"
                + " o = order f by  $0 .. c ASC  ;"
                + " lim = limit o 10; ;"
                ;
            compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "lim");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {true, true, true};
            checkNumExpressionPlansForSort(lp, 3, isAsc);
        }

        query =
            "  l1 = load '" + INP_FILE_5FIELDS +
                    "' as (a : int, b : long, c : int, d : int, e : int);"
            + " o = order l1 by $0 .. $4  ;"
            ;
        compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");

        //check number of sort expression plans

        LogicalPlan lp = createAndProcessLPlan(query);
        boolean[] isAsc = {true,true,true,true,true};
        checkNumExpressionPlansForSort(lp, 5, isAsc);

        Util.registerMultiLineQuery(pigServer, query);

        Iterator<Tuple> it = pigServer.openIterator("o");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(10,20,30,40,50)",
                            "(11,21,31,41,51)",
                    });
        Util.checkQueryOutputs(it, expectedRes);
    }


    /**
     * Test nested order-by with schema
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testRangeOrderByNestedWSchema() throws IOException, ParserException{
        String query;

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                        "' as (a : int, b : long, c : int, d : int, e : int);"
                + " g = group l1 by a;"
                + " f = foreach g { o = order l1 by  .. $2 DESC; generate group, o;}"
                ;
            String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "f");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {false,false,false};
            checkNumExpressionPlansForSort(lp, 3, isAsc);
        }
        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                        "' as (a : int, b : long, c : int, d : int, e : int);"
                + " g = group l1 by a;"
                + " f = foreach g { o = order l1 by  d .. ; generate group, o;}"
                ;
            String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "f");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {true,true};
            checkNumExpressionPlansForSort(lp, 2, isAsc);
        }
        {

            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                        "' as (a : int, b : long, c : int, d : int, e : int);"
                + " g = group l1 by a;"
                + " f = foreach g { o = order l1 by  $2 .. $3 ASC, $1..c DESC; generate group, o;}"
                ;
            String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "f");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {true,true,false,false};
            checkNumExpressionPlansForSort(lp, 4, isAsc);

        }

        query =
            "  l1 = load '" + INP_FILE_5FIELDS +
                    "' as (a : int, b : long, c : int, d : int, e : int);"
            + " g = group l1 by a;"
            + " f = foreach g { o = order l1 by  $2 .. $3 DESC, $1 ASC; generate group, o;}"
            ;

        String expectedSchStr = "g : int,o: {t : (a: int,b: long,c: int,d: int,e: int)}";
        Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
        compileAndCompareSchema(expectedSch, query, "f");

        //check number of sort expression plans
               LogicalPlan lp = createAndProcessLPlan(query);
        boolean[] isAsc = {false,false,true};
        checkNumExpressionPlansForSort(lp, 3, isAsc);

        Util.registerMultiLineQuery(pigServer, query);

        Iterator<Tuple> it = pigServer.openIterator("f");

        String[] expectedRes =
                new String[] {
                        "(10,{(10,20,30,40,50)})",
                        "(11,{(11,21,31,41,51)})",
                };
        Schema s = pigServer.dumpSchema("f");
        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));
    }

    /**
     * Test nested order-by without schema
     * @throws IOException
     * @throws ParserException
     */
    @Test
    public void testRangeOrderByNestedNOSchema() throws IOException, ParserException{
        String query;

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS + "';"
                + " g = group l1 by $0;"
                + " f = foreach g { o = order l1 by  .. $2 DESC; generate group, o;}"
                ;
            String expectedSchStr = "g : bytearray, o: {t : ()}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "f");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {false,false,false};
            checkNumExpressionPlansForSort(lp, 3, isAsc);
        }
        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS + "';"
                + " g = group l1 by $0;"
                + " f = foreach g { o = order l1 by  $3 .. ; generate group, o;}"
                ;
            String expectedSchStr = "g : bytearray, o: {t : ()}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "f");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            //project to end can't be expanded
            boolean[] isAsc = {true};
            checkNumExpressionPlansForSort(lp, 1, isAsc);
        }
        {

            query =
                "  l1 = load '" + INP_FILE_5FIELDS + "';"
                + " g = group l1 by $1;"
                + " f = foreach g { o = order l1 by  $2 .. $3 ASC, $1 .. $2 DESC; generate group, o;}"
                ;
            String expectedSchStr = "g : bytearray, o: {t : ()}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "f");

            //check number of sort expression plans

            LogicalPlan lp = createAndProcessLPlan(query);
            boolean[] isAsc = {true,true,false,false};
            checkNumExpressionPlansForSort(lp, 4, isAsc);

        }

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + " g = group l1 by 1;"
            + " f = foreach g { o = order l1 by  $2 .. $3 desc; generate group, o;}"
            ;
        String expectedSchStr = "g : int, o: {t : ()}";
        Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
        compileAndCompareSchema(expectedSch, query, "f");
        //check number of sort expression plans

        LogicalPlan lp = createAndProcessLPlan(query);
        boolean[] isAsc = {false,false};
        checkNumExpressionPlansForSort(lp, 2, isAsc);

        Util.registerMultiLineQuery(pigServer, query);

        Iterator<Tuple> it = pigServer.openIterator("f");

        String[] expectedRes =
                new String[] {
                        "(1,{(11,21,31,41,51),(10,20,30,40,50)})",
                };
        Schema s = pigServer.dumpSchema("f");
        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));

    }

    private LOSort checkNumExpressionPlansForSort(LogicalPlan lp, int numPlans, boolean[] isAsc) {
        Class<?> sortClass = org.apache.pig.newplan.logical.relational.LOSort.class;
        LOSort sort = (LOSort) NewLogicalPlanUtil.getRelOpFromPlan(lp, sortClass);
        assertEquals("number of sort col plans", numPlans, sort.getSortColPlans().size());

        List<Boolean> ascCols = sort.getAscendingCols();
        for(int i = 0; i < ascCols.size(); i++){
            assertEquals("ascending order", isAsc[i], ascCols.get(i));
        }

        return sort;
    }

    private LogicalPlan createAndProcessLPlan(String query) throws FrontendException {
        //TODO: create a common util function for logical plan tests
        LogicalPlan lp = generateLogicalPlan(query);
        lp.validate(pigServer.getPigContext(), "test", false);

        return lp;

    }

    private LogicalPlan generateLogicalPlan(String query) {
        try {
            return ParserTestingUtils.generateLogicalPlan( query );
        } catch(Exception ex) {
            ex.printStackTrace();
            Assert.fail( "Failed to generate logical plan for query [" + query + "] due to exception: " + ex );
        }
        return null;
    }

    @Test
    public void testRangeOrderByMixWSchema() throws IOException, ParserException{
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS +
                    "' as (a : int, b : long, c : int, d : int, e : int);"
            + " o = order l1 by  b .. c, d .. DESC,  a DESC;"
            ;
        compileAndCompareSchema("a : int, b : long, c : int, d : int, e : int", query, "o");

        //check number of sort expression plans

        LogicalPlan lp = createAndProcessLPlan(query);
        boolean[] isAsc = {true,true,false,false,false};
        checkNumExpressionPlansForSort(lp, 5, isAsc);

        Util.registerMultiLineQuery(pigServer, query);

        Iterator<Tuple> it = pigServer.openIterator("o");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(10,20,30,40,50)",
                            "(11,21,31,41,51)",
                    });
        Util.checkQueryOutputs(it, expectedRes);
    }


    @Test
    public void testRangeOrderByMixNOSchema() throws IOException, ParserException{
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + " o = order l1 by  $1 .. $2 DESC,  $0 , $4 .. DESC;"
            ;
        compileAndCompareSchema((Schema)null, query, "o");

        //check number of sort expression plans

        LogicalPlan lp = createAndProcessLPlan(query);
        boolean[] isAsc = {false, false,true,false};
        checkNumExpressionPlansForSort(lp, 4, isAsc);

        Util.registerMultiLineQuery(pigServer, query);

        pigServer.explain("o", System.err);
        Iterator<Tuple> it = pigServer.openIterator("o");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(11,21,31,41,51)",
                            "(10,20,30,40,50)",
                    });
        Util.checkQueryOutputs(it, expectedRes);
    }

    @Test
    public void testRangeOrderByStartNOSchema() throws IOException, ParserException{
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + " o = order l1 by $3 .. DESC;"
            ;
        compileAndCompareSchema((Schema)null, query, "o");

        //check number of sort expression plans

        LogicalPlan lp = createAndProcessLPlan(query);
        boolean[] isAsc = {false};
        checkNumExpressionPlansForSort(lp, 1, isAsc);

        Util.registerMultiLineQuery(pigServer, query);

        pigServer.explain("o", System.err);
        Iterator<Tuple> it = pigServer.openIterator("o");

        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(11,21,31,41,51)",
                            "(10,20,30,40,50)",
                    });
        Util.checkQueryOutputs(it, expectedRes);
    }

    @Test
    public void testRangeOrderByStartNegNOSchema() throws IOException, ParserException{
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';"
            + " o = order l1 by $3 .. DESC, $1;"
            ;
        Util.checkExceptionMessage(query, "o","Project-range to end (eg. x..)" +
                " is supported in order-by only as last sort column");
    }

    @Test
    public void testRangeGroupWSchema() throws IOException, ParserException{
        String query;

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                "' as (a : int, b : long, c : int, d : int, e : int);" +
                "  l2 = load '" + INP_FILE_5FIELDS +
                "' as (a : int, b : long, c : int, d : int, e : int);" +
                "  g = group l1 by   d ..,  l2 by d ..;"
                ;
            String expectedSchStr = "grp: (d: int,e : int)," +
                            "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
                            "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";

            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "g");
            //check number of group expression plans
            LogicalPlan lp = createAndProcessLPlan(query);
            checkNumExpressionPlansForGroup(lp, 2);
        }

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                "' as (a : int, b : long, c : int, d : int, e : int);" +
                "  l2 = load '" + INP_FILE_5FIELDS +
                "' as (a : int, b : long, c : int, d : int, e : int);" +
                "  g = group l1 by   c .. $3,  l2 by $3..$4;"
                ;
            String expectedSchStr = "grp: (c: int,d : int)," +
                            "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
                            "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";

            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "g");
            //check number of group expression plans
            LogicalPlan lp = createAndProcessLPlan(query);
            checkNumExpressionPlansForGroup(lp, 2);
        }

        query =
            "  l1 = load '" + INP_FILE_5FIELDS +
            "' as (a : int, b : long, c : int, d : int, e : int);" +
            "  l2 = load '" + INP_FILE_5FIELDS +
            "' as (a : int, b : long, c : int, d : int, e : int);" +
            "  g = group l1 by   .. c,  l2 by .. c;"
            ;
        String expectedSchStr = "grp: (a: int,b: long,c: int)," +
                "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
                "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";

        Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
        compileAndCompareSchema(expectedSch, query, "g");


        //check number of group expression plans
        LogicalPlan lp = createAndProcessLPlan(query);
        checkNumExpressionPlansForGroup(lp, 3);

        Util.registerMultiLineQuery(pigServer, query);

        String[] expectedRes =
                new String[] {
                        "((10,20,30),{(10,20,30,40,50)},{(10,20,30,40,50)})",
                        "((11,21,31),{(11,21,31,41,51)},{(11,21,31,41,51)})",
                };
        Iterator<Tuple> it = pigServer.openIterator("g");
        Schema s = pigServer.dumpSchema("g");
        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));
    }

    /**
     * some transformations to schema, because the parser does not accept
     * group as a column name in schema, and to add tuple within bag schema
     * @param expectedSchStr
     * @return
     * @throws ParserException
     * @throws FrontendException
     */
    private Schema getCleanedGroupSchema(String expectedSchStr) throws ParserException, FrontendException {
        Schema expectedSch = Utils.getSchemaFromString(expectedSchStr);
        expectedSch.getField(0).alias = "group";
        if(expectedSch.size() > 1)
            expectedSch.getField(1).schema.getField(0).alias = null;
        if(expectedSch.size() > 2)
            expectedSch.getField(2).schema.getField(0).alias = null;
        expectedSch = org.apache.pig.newplan.logical.Util.fixSchemaAddTupleInBag(expectedSch);
        return expectedSch;
    }

    private LOCogroup checkNumExpressionPlansForGroup(LogicalPlan lp, int numPlans) {
        Class<?> groupClass = org.apache.pig.newplan.logical.relational.LOCogroup.class;
        LOCogroup grp = (LOCogroup) NewLogicalPlanUtil.getRelOpFromPlan(lp, groupClass);

        for( int inp : grp.getExpressionPlans().keySet()){
            List<LogicalExpressionPlan> plans = grp.getExpressionPlans().get(inp);
            assertEquals("number of group-by plans", numPlans, plans.size());
        }

        return grp;
    }

    @Test
    public void testRangeCoGroupMixWSchema() throws IOException, ParserException{
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS +
            "' as (a : int, b : long, c : int, d : int, e : int);" +
            "  l2 = load '" + INP_FILE_5FIELDS +
            "' as (a : int, b : long, c : int, d : int, e : int);" +
            "  g = group l1 by  (a + b, c .. d, e.. ),  l2 by ($0 + $1, c..d, $4..);"
            ;
        String expectedSchStr = "grp: (x : long, c :int , d :int, e : int)," +
                        "l1: {t : (a: int,b: long,c: int,d: int,e: int)}," +
                        "l2: {t : (a: int,b: long,c: int,d: int,e: int)}";

        Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
        expectedSch.getField(0).schema.getField(0).alias = null;
        compileAndCompareSchema(expectedSch, query, "g");


        //check number of group expression plans
        LogicalPlan lp = createAndProcessLPlan(query);
        checkNumExpressionPlansForGroup(lp, 4);

        Util.registerMultiLineQuery(pigServer, query);

        String[] expectedRes =
                new String[] {
                        "((30,30,40,50),{(10,20,30,40,50)},{(10,20,30,40,50)})",
                        "((32,31,41,51),{(11,21,31,41,51)},{(11,21,31,41,51)})",
                };
        Iterator<Tuple> it = pigServer.openIterator("g");
        Schema s = pigServer.dumpSchema("g");
        Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));
    }

    @Test
    public void testRangeGroupMixWSchema() throws IOException, ParserException{
        String query;

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS +
                "' as (a : int, b : long, c : int, d : int, e : int);" +
                "  g = group l1 by  b .. c;"
                ;
            String expectedSchStr = "grp: (b : long, c :int)," +
                            "l1: {t : (a: int,b: long,c: int,d: int,e: int)}";

            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "g");


            //check number of group expression plans
            LogicalPlan lp = createAndProcessLPlan(query);
            checkNumExpressionPlansForGroup(lp, 2);
        }

        query =
            "  l1 = load '" + INP_FILE_5FIELDS +
            "' as (a : int, b : long, c : int, d : int, e : int);" +
            "  g = group l1 by  $2 .. ;" +
            "  lim = limit g 2;"
            ;
        String expectedSchStr = "grp: (c :int , d :int, e : int)," +
                        "l1: {t : (a: int,b: long,c: int,d: int,e: int)}";

        Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
        compileAndCompareSchema(expectedSch, query, "lim");


        //check number of group expression plans
        LogicalPlan lp = createAndProcessLPlan(query);
        checkNumExpressionPlansForGroup(lp, 3);

        Util.registerMultiLineQuery(pigServer, query);

        String[] expectedRes =
                new String[] {
                        "((30,40,50),{(10,20L,30,40,50)})",
                        "((31,41,51),{(11,21L,31,41,51)})",
                };
        Iterator<Tuple> it = pigServer.openIterator("lim");
        Schema s = pigServer.dumpSchema("lim");
        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));
    }


    @Test
    public void testRangeGroupMixNOSchema() throws IOException, ParserException{
        String query;

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS + "';" +
                "  g = group l1 by  .. $2;"
                ;
            String expectedSchStr = "g : (duma, dumb, dumc), l1: {t : ()}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            setAliasesToNull(expectedSch.getField(0).schema);
            compileAndCompareSchema(expectedSch, query, "g");


            //check number of group expression plans
            LogicalPlan lp = createAndProcessLPlan(query);
            checkNumExpressionPlansForGroup(lp, 3);
        }

        {
            query =
                "  l1 = load '" + INP_FILE_5FIELDS + "';" +
                "  g = group l1 by  $3 .. $3;"
                ;
            String expectedSchStr = "g : bytearray, l1: {t : ()}";
            Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
            compileAndCompareSchema(expectedSch, query, "g");


            //check number of group expression plans
            LogicalPlan lp = createAndProcessLPlan(query);
            checkNumExpressionPlansForGroup(lp, 1);
        }

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';" +
            "  g = group l1 by  $2 .. ;"
            ;
        String expectedSchStr = "grp: (), l1: {t : ()}";

        Schema expectedSch = getCleanedGroupSchema(expectedSchStr);
        compileAndCompareSchema(expectedSch, query, "g");


        //check number of group expression plans
        LogicalPlan lp = createAndProcessLPlan(query);
        checkNumExpressionPlansForGroup(lp, 1);

        Util.registerMultiLineQuery(pigServer, query);

        String[] expectedRes =
                new String[] {
                        "((30,40,50),{(10,20,30,40,50)})",
                        "((31,41,51),{(11,21,31,41,51)})",
                };
        Iterator<Tuple> it = pigServer.openIterator("g");
        Schema s = pigServer.dumpSchema("g");
        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));
    }

    private void setAliasesToNull(Schema schema) {
       for(FieldSchema fs : schema.getFields()){
           fs.alias = null;
       }
    }

    @Test
    public void testRangeJoinMixWSchema() throws IOException, ParserException{
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS +
            "' as (a : int, b : long, c : int, d : int, e : int);" +
            "  l2 = load '" + INP_FILE_5FIELDS +
            "' as (a : int, b : long, c : int, d : int, e : int);" +
            "  j = join l1 by  (a + b, c .. d, e.. ),  l2 by ($0 + $1, c..d, $4..);"
            ;
        String expectedSchStr = "l1::a: int, l1::b: long, l1::c: int, l1::d: int, l1::e: int," +
                        "l2::a: int, l2::b: long, l2::c: int, l2::d: int, l2::e: int";

        compileAndCompareSchema(expectedSchStr, query, "j");


        //check number of group expression plans
        LogicalPlan lp = createAndProcessLPlan(query);
        checkNumExpressionPlansForJoin(lp, 4);

        Util.registerMultiLineQuery(pigServer, query);

        String[] expectedRes =
                new String[] {
                        "(10,20,30,40,50,10,20,30,40,50)",
                        "(11,21,31,41,51,11,21,31,41,51)",
                };
        Iterator<Tuple> it = pigServer.openIterator("j");
        Schema s = pigServer.dumpSchema("j");
        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));
    }

    @Test
    public void testRangeJoinMixNOSchema() throws IOException, ParserException{
        String query;

        query =
            "  l1 = load '" + INP_FILE_5FIELDS + "';" +
            "  l2 = load '" + INP_FILE_5FIELDS + "';" +
            "  j = join l1 by  $0 .. $3,  l2 by $0 .. $3;"
            ;

        compileAndCompareSchema((Schema)null, query, "j");

        //check number of group expression plans
        LogicalPlan lp = createAndProcessLPlan(query);
        checkNumExpressionPlansForJoin(lp, 4);

        Util.registerMultiLineQuery(pigServer, query);

        String[] expectedRes =
                new String[] {
                        "(10,20,30,40,50,10,20,30,40,50)",
                        "(11,21,31,41,51,11,21,31,41,51)",
                };
        Iterator<Tuple> it = pigServer.openIterator("j");
        Schema s = pigServer.dumpSchema("j");
        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
                Util.isSparkExecType(cluster.getExecType()));
    }

    @Test
    public void testRangeCoGroupNegNoSchema() throws IOException, ParserException{
        String query;
        //cogroup
        query =
            "  l1 = load '" + INP_FILE_5FIELDS +  "';" +
            "  l2 = load '" + INP_FILE_5FIELDS +  "';" +
            "  g = cogroup l1 by  ($0 ..  ),  l2 by ($0 .. );";
        Util.checkExceptionMessage(query, "g", "Cogroup/Group by '*' or 'x..' " +
                "(range of columns to the end) " +
                        "is only allowed if the input has a schema");
    }

    private LOJoin checkNumExpressionPlansForJoin(LogicalPlan lp, int numPlans) {
        Class<?> joinClass = org.apache.pig.newplan.logical.relational.LOJoin.class;
        LOJoin join = (LOJoin) NewLogicalPlanUtil.getRelOpFromPlan(lp, joinClass);

        for( int inp : join.getExpressionPlans().keySet()){
            List<LogicalExpressionPlan> plans = join.getExpressionPlans().get(inp);
            assertEquals("number of join exp plans", numPlans, plans.size());
        }

        return join;
    }

}
