/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.tools.pigstats.InputStats;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class TestMultiQuery {

    private static PigServer myPig;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        Util.copyFromLocalToLocal(
                "test/org/apache/pig/test/data/passwd", "passwd");
        Util.copyFromLocalToLocal(
                "test/org/apache/pig/test/data/passwd2", "passwd2");
        Properties props = new Properties();
        props.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true);
        myPig = new PigServer(Util.getLocalTestMode(), props);
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), "passwd");
        Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), "passwd2");
        deleteOutputFiles();
    }

    private static final String simpleEchoStreamingCommand;
    static {
        if (Util.WINDOWS)
            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
        else
            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
    }

    @Before
    public void setUp() throws Exception {
        deleteOutputFiles();
    }

    @After
    public void tearDown() throws Exception {

    }

    @Test
    public void testMultiQueryJiraPig1438() throws Exception {

        // test case: merge multiple distinct jobs

        String INPUT_FILE = "abc";

        String[] inputData = {
                "1\t2\t3",
                "2\t3\t4",
                "1\t2\t3",
                "2\t3\t4",
                "1\t2\t3"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
        myPig.registerQuery("B1 = foreach A generate col1, col2;");
        myPig.registerQuery("B2 = foreach A generate col2, col3;");
        myPig.registerQuery("C1 = distinct B1;");
        myPig.registerQuery("C2 = distinct B2;");
        myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
        myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
        myPig.registerQuery("store D1 into 'output1';");
        myPig.registerQuery("store D2 into 'output2';");

        myPig.executeBatch();

        myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
        Iterator<Tuple> iter = myPig.openIterator("E");

        String[] expectedResults = new String[]{
                        "(1,2)",
                        "(2,3)"
        };
        Schema s = myPig.dumpSchema("E");
        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
                .isSparkExecType(Util.getLocalTestMode()));


        myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
        iter = myPig.openIterator("E");

        expectedResults = new String[]{
                        "(2,3)",
                        "(3,4)"
        };
        s = myPig.dumpSchema("E");
        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
                .isSparkExecType(Util.getLocalTestMode()));
    }

    @Test
    public void testMultiQueryJiraPig1252() throws Exception {

        // test case: Problems with secondary key optimization and multiquery
        // diamond optimization

        String INPUT_FILE = "abc";

        String[] inputData = {
            "1\t2\t3",
            "2\t3\t4",
            "3\t\t5",
            "5\t6\t6",
            "6\t\t7"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1, col2, col3);");
        myPig.registerQuery("B = foreach A generate (chararray) col1, " +
                "(chararray) ((col2 is not null) ?  " +
                "col2 : (col3 < 6 ? col3 : '')) as splitcond;");
        myPig.registerQuery("split B into C if splitcond !=  '', D if splitcond == '';");
        myPig.registerQuery("E = group C by splitcond;");
        myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");

        Iterator<Tuple> iter = myPig.openIterator("F");

        String[] expectedResults = new String[]{
                        "(1,2)",
                        "(2,3)",
                        "(3,5)",
                        "(5,6)"
        };
        Schema s = myPig.dumpSchema("F");
        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
                .isSparkExecType(Util.getLocalTestMode()));
    }

    @Test
    public void testMultiQueryJiraPig1169() throws Exception {

        // test case: Problems with some top N queries

        String INPUT_FILE = "abc";

        String[] inputData = {
                "1\t2\t3",
                "2\t3\t4",
                "3\t4\t5",
                "5\t6\t7",
                "6\t7\t8"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("A = load '" + INPUT_FILE
                + "' as (a:int, b, c);");
        myPig.registerQuery("A1 = Order A by a desc;");
        myPig.registerQuery("A2 = limit A1 2;");
        myPig.registerQuery("store A1 into 'output1';");
        myPig.registerQuery("store A2 into 'output2';");

        myPig.executeBatch();

        myPig.registerQuery("B = load 'output2' as (a:int, b, c);");

        Iterator<Tuple> iter = myPig.openIterator("B");

        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {
                        "(6,7,8)",
                        "(5,6,7)"
                });

        int counter = 0;
        while (iter.hasNext()) {
            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
        }

        assertEquals(expectedResults.size(), counter);
    }

    @Test
    public void testMultiQueryJiraPig1171() throws Exception {

        // test case: Problems with some top N queries

        String INPUT_FILE = "abc";

        String[] inputData = {
            "1\tapple\t3",
            "2\torange\t4",
            "3\tpersimmon\t5"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("A = load '" + INPUT_FILE
                + "' as (a:long, b, c);");
        myPig.registerQuery("A1 = Order A by a desc;");
        myPig.registerQuery("A2 = limit A1 1;");
        myPig.registerQuery("B = load '" + INPUT_FILE
                + "' as (a:long, b, c);");
        myPig.registerQuery("B1 = Order B by a desc;");
        myPig.registerQuery("B2 = limit B1 1;");

        myPig.registerQuery("C = cross A2, B2;");

        Iterator<Tuple> iter = myPig.openIterator("C");

        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {
                        "(3L,'persimmon',5,3L,'persimmon',5)"
                });

        int counter = 0;
        while (iter.hasNext()) {
            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
        }

        assertEquals(expectedResults.size(), counter);
    }

    @Test
    public void testMultiQueryJiraPig1157() throws Exception {

        // test case: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM

        String INPUT_FILE = "abc";
        String INPUT_FILE_1 = "abc";

        String[] inputData = {
                "1\tapple\t3",
                "2\torange\t4",
                "3\tpersimmon\t5"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("A = load '" + INPUT_FILE
                + "' as (a:long, b, c);");
        myPig.registerQuery("A1 = FOREACH A GENERATE a;");
        myPig.registerQuery("B = GROUP A1 BY a;");
        myPig.registerQuery("C = load '" + INPUT_FILE_1
                + "' as (x:long, y);");
        myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
        myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");

        Iterator<Tuple> iter = myPig.openIterator("E");

        String[] expectedResults = new String[]{
                "(1L,apple,3,1L,apple,1L,{(1L)})",
                "(2L,orange,4,2L,orange,2L,{(2L)})",
                "(3L,persimmon,5,3L,persimmon,3L,{(3L)})"
        };
        Schema s = myPig.dumpSchema("E");
        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
                .isSparkExecType(Util.getLocalTestMode()));
    }

    @Test
    public void testMultiQueryJiraPig1068() throws Exception {

        // test case: COGROUP fails with 'Type mismatch in key from map:
        // expected org.apache.pig.impl.io.NullableText, recieved org.apache.pig.impl.io.NullableTuple'

        String INPUT_FILE = "pig-1068.txt";

        String[] inputData = {
            "10\tapple\tlogin\tjar",
            "20\torange\tlogin\tbox",
            "30\tstrawberry\tquit\tbot"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("logs = load '" + INPUT_FILE
                + "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
        myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
        myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
        myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
        myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
                + "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
        myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
        myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
        myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");

        Iterator<Tuple> iter = myPig.openIterator("joined_session_info");

        String[] expectedResults = new String[]{
                "(apple,{},{(apple,jar,1L)})",
                "(orange,{},{(orange,box,1L)})",
                "(strawberry,{(30,strawberry,quit,bot)},{})"};

        Schema s = myPig.dumpSchema("joined_session_info");
        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
                .isSparkExecType(Util.getLocalTestMode()));
    }

    @Test
    public void testMultiQueryJiraPig1108() throws Exception {

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' "
                + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
        myPig.registerQuery("b = group plan1 by uname;");
        myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
                + "generate flatten(group) as foo, tmp; };");
        myPig.registerQuery("d = filter c BY foo is not null;");
        myPig.registerQuery("store d into 'output1';");
        myPig.registerQuery("store plan2 into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig1114() throws Exception {

        // test case: MultiQuery optimization throws error when merging 2 level splits

        String INPUT_FILE = "data.txt";

        String[] inputData = {
            "10\tjar",
            "20\tbox",
            "30\tbot"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("data = load '" + INPUT_FILE
                + "' USING PigStorage as (id:int, name:chararray);");
        myPig.registerQuery("ids = FOREACH data GENERATE id;");
        myPig.registerQuery("allId = GROUP ids all;");
        myPig.registerQuery("allIdCount = FOREACH allId GENERATE group as allId, COUNT(ids) as total;");
        myPig.registerQuery("idGroup = GROUP ids by id;");
        myPig.registerQuery("idGroupCount = FOREACH idGroup GENERATE group as id, COUNT(ids) as count;");
        myPig.registerQuery("countTotal = cross idGroupCount, allIdCount;");
        myPig.registerQuery("idCountTotal = foreach countTotal generate id, count, total, (double)count / (double)total as proportion;");
        myPig.registerQuery("orderedCounts = order idCountTotal by count desc;");
        myPig.registerQuery("STORE orderedCounts INTO 'output1';");

        myPig.registerQuery("names = FOREACH data GENERATE name;");
        myPig.registerQuery("allNames = GROUP names all;");
        myPig.registerQuery("allNamesCount = FOREACH allNames GENERATE group as namesAll, COUNT(names) as total;");
        myPig.registerQuery("nameGroup = GROUP names by name;");
        myPig.registerQuery("nameGroupCount = FOREACH nameGroup GENERATE group as name, COUNT(names) as count;");
        myPig.registerQuery("namesCrossed = cross nameGroupCount, allNamesCount;");
        myPig.registerQuery("nameCountTotal = foreach namesCrossed generate name, count, total, (double)count / (double)total as proportion;");
        myPig.registerQuery("nameCountsOrdered = order nameCountTotal by count desc;");
        myPig.registerQuery("STORE nameCountsOrdered INTO 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

/** Test case is disabled due to PIG-2038
    @Test
    public void testMultiQueryJiraPig1113() throws Exception {

        // test case: Diamond query optimization throws error in JOIN

        String INPUT_FILE_1 = "set1.txt";
        String INPUT_FILE_2 = "set2.txt";


        String[] inputData_1 = {
            "login\t0\tjar",
            "login\t1\tbox",
            "quit\t0\tmany"
        };

        Util.createLocalInputFile(INPUT_FILE_1, inputData_1);

        String[] inputData_2 = {
            "apple\tlogin\t{(login)}",
            "orange\tlogin\t{(login)}",
            "strawberry\tquit\t{(login)}"
        };

        Util.createLocalInputFile(INPUT_FILE_2, inputData_2);

        myPig.setBatchOn();

        myPig.registerQuery("set1 = load '" + INPUT_FILE_1
                + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
        myPig.registerQuery("set2 = load '" + INPUT_FILE_2
                + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
        myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
                + "(chararray) 0 as f3;");
        myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
                + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
        myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
        myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");

        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {
                        "('quit','0','many','strawberry','quit','0')",
                        "('login','0','jar','apple','login','0')",
                        "('login','0','jar','orange','login','0')",
                        "('login','1','box','apple','login','1')",
                        "('login','1','box','orange','login','1')",
                        "('login','1','box','strawberry','login','1')"
                });

        Iterator<Tuple> iter = myPig.openIterator("joined_sets");
        int count = 0;
        while (iter.hasNext()) {
            assertEquals(expectedResults.get(count++).toString(), iter.next().toString());
        }
        assertEquals(expectedResults.size(), count);
    }

    @Test
    public void testMultiQueryJiraPig1060_2() throws Exception {

        // test case:

        String INPUT_FILE = "pig-1060.txt";

        String[] inputData = {
            "apple\t2",
            "apple\t12",
            "orange\t3",
            "orange\t23",
            "strawberry\t10",
            "strawberry\t34"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("data = load '" + INPUT_FILE +
        "' as (name:chararray, gid:int);");
        myPig.registerQuery("f1 = filter data by gid < 5;");
        myPig.registerQuery("g1 = group f1 by name;");
        myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
        myPig.registerQuery("store p1 into 'output1';");

        myPig.registerQuery("f2 = filter data by gid > 5;");
        myPig.registerQuery("g2 = group f2 by name;");
        myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
        myPig.registerQuery("store p2 into 'output2';");

        myPig.registerQuery("f3 = filter f2 by gid > 10;");
        myPig.registerQuery("g3 = group f3 by name;");
        myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
        myPig.registerQuery("store p3 into 'output3';");

        myPig.registerQuery("f4 = filter f3 by gid < 20;");
        myPig.registerQuery("g4 = group f4 by name;");
        myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
        myPig.registerQuery("store p4 into 'output4';");

        List<ExecJob> jobs = myPig.executeBatch();
        assertEquals(4, jobs.size());

        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig920_2() throws Exception {

        // test case: execution of a query with two diamonds

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' " +
                             "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = filter a by uid < 5;");
        myPig.registerQuery("c = filter a by gid >= 5;");
        myPig.registerQuery("d = filter a by uid >= 5;");
        myPig.registerQuery("e = filter a by gid < 5;");
        myPig.registerQuery("f = cogroup c by $0, b by $0;");
        myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
        myPig.registerQuery("store f1 into 'output1';");
        myPig.registerQuery("g = cogroup d by $0, e by $0;");
        myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
        myPig.registerQuery("store g1 into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig920_3() throws Exception {

        // test case: execution of a simple diamond query

        String INPUT_FILE = "pig-920.txt";

        String[] inputData = {
            "apple\tapple\t100\t10",
            "apple\tapple\t200\t20",
            "orange\torange\t100\t10",
            "orange\torange\t300\t20"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("a = load '" + INPUT_FILE +
                            "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = filter a by uid < 300;");
        myPig.registerQuery("c = filter a by gid > 10;");
        myPig.registerQuery("d = cogroup c by $0, b by $0;");
        myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");

        Iterator<Tuple> iter = myPig.openIterator("e");

        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {
                        "('apple',1L,2L)",
                        "('orange',1L,1L)"
                });

        int counter = 0;
        while (iter.hasNext()) {
            assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
        }

        assertEquals(expectedResults.size(), counter);
    }

    @Test
    public void testMultiQueryJiraPig976() throws Exception {

        // test case: key ('group') isn't part of foreach output
        // and keys have the same type.

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' " +
                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = group a by uid;");
        myPig.registerQuery("c = group a by gid;");
        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
        myPig.registerQuery("store d into 'output1';");
        myPig.registerQuery("store e into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig976_2() throws Exception {

        // test case: key ('group') isn't part of foreach output
        // and keys have different types

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' " +
                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = group a by uname;");
        myPig.registerQuery("c = group a by gid;");
        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
        myPig.registerQuery("store d into 'output1';");
        myPig.registerQuery("store e into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig976_3() throws Exception {

        // test case: group all and key ('group') isn't part of output

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' " +
                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = group a all;");
        myPig.registerQuery("c = group a by gid;");
        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
        myPig.registerQuery("store d into 'output1';");
        myPig.registerQuery("store e into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig976_4() throws Exception {

        // test case: group by multi-cols and key ('group') isn't part of output

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' " +
                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = group a by uid;");
        myPig.registerQuery("c = group a by (uname, gid);");
        myPig.registerQuery("d = foreach b generate SUM(a.gid);");
        myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);");
        myPig.registerQuery("store d into 'output1';");
        myPig.registerQuery("store e into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig976_5() throws Exception {

        // test case: key ('group') in multiple positions.

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' " +
                            "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = group a by uid;");
        myPig.registerQuery("c = group a by (uname, gid);");
        myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group as foo;");
        myPig.registerQuery("d1 = foreach d generate $1 + $2;");
        myPig.registerQuery("e = foreach c generate group, COUNT(a);");
        myPig.registerQuery("store d1 into 'output1';");
        myPig.registerQuery("store e into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }
*/
    @Test
    public void testMultiQueryJiraPig976_6() throws Exception {

        // test case: key ('group') has null values.

        String INPUT_FILE = "pig-976.txt";

        String[] inputData = {
            "apple\tapple\t100\t10",
            "apple\tapple\t\t20",
            "orange\torange\t100\t10",
            "orange\torange\t\t20",
            "strawberry\tstrawberry\t300\t10"
        };

        Util.createLocalInputFile(INPUT_FILE, inputData);

        myPig.setBatchOn();

        myPig.registerQuery("a = load '" + INPUT_FILE +
                            "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = group a by uid;");
        myPig.registerQuery("c = group a by gid;");
        myPig.registerQuery("d = foreach b generate group, SUM(a.gid);");
        myPig.registerQuery("e = foreach c generate COUNT(a), group;");
        myPig.registerQuery("store d into 'output1';");
        myPig.registerQuery("store e into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        assertTrue(jobs.size() == 2);

        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig983_2() throws Exception {

        System.out.println("===== multi-query Jira Pig-983_2 =====");

        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' " +
                             "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b = filter a by uid < 5;");
        myPig.registerQuery("c = filter a by uid >= 5;");
        myPig.registerQuery("d = join b by uname, c by uname;");
        myPig.registerQuery("e = group d by b::gid;");
        myPig.registerQuery("e1 = foreach e generate group, COUNT(d.b::uid);");
        myPig.registerQuery("store e1 into 'output1';");
        myPig.registerQuery("f = group d by c::gid;");
        myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
        myPig.registerQuery("store f1 into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();

        assertTrue(jobs.size() == 2);

        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    /**
       This test will fail indeterministically without the PIG-3902 patch. Sometimes
       the loads will get attached as dependencies to the appropriate stores (during
       the postProcess() method on PigServer.Graph) and the dag will successfully
       run, while other times it will fail (if the loads get attached as dependencies
       of the incorrect stores).
     */
    @Test
    public void testMultiQueryJiraPig3902() throws Exception {

        // test case: Pig Server creates implicit cycle when
        // loading and storing from same location with an
        // intermediate store.

        Storage.Data data = Storage.resetData(myPig);
        data.set("inputLocation", Storage.tuple(1,2,3));

        myPig.setBatchOn();
        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b, c);");
        myPig.registerQuery("A1 = order A by a desc parallel 3;");
        myPig.registerQuery("A2 = limit A1 2;");
        myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
        myPig.registerQuery("A3 = load 'output1' using mock.Storage()as (a:int, b, c);");
        myPig.registerQuery("A4 = filter A3 by a > 2;");
        myPig.registerQuery("store A4 into 'inputLocation' using mock.Storage();");

        List<ExecJob> jobs = myPig.executeBatch();

        assertTrue(jobs.size() == 2);

        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
        }
    }

    @Test
    public void testMultiQueryJiraPig4170() throws Exception {

        Storage.Data data = Storage.resetData(myPig);
        data.set("inputLocation", Storage.tuple(1, "hello"), Storage.tuple(2, "world"));

        myPig.setBatchOn();
        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b:chararray);");
        myPig.registerQuery("A1 = group A by a;");
        myPig.registerQuery("A2 = group A by b;");
        myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
        myPig.registerQuery("store A2 into 'output2' using mock.Storage();");

        myPig.executeBatch();

        myPig.registerQuery("A = load 'output1' using mock.Storage() as (a:int, c:bag{(i:int, s:chararray)});");
        Iterator<Tuple> iter = myPig.openIterator("A");
        iter.next().toString().equals("(1,{(1,hello)})");
        iter.next().toString().equals("(2,{(2,world)})");

        myPig.registerQuery("A = load 'output2' using mock.Storage() as (b:chararray, c:bag{(i:int, s:chararray)});");
        iter = myPig.openIterator("A");
        iter.next().toString().equals("(hello,{(1,hello)})");
        iter.next().toString().equals("(world,{(2,world)})");
    }

    @Test
    public void testMultiQueryJiraPig4480() throws Exception {

        Storage.Data data = Storage.resetData(myPig);
        data.set("inputLocation",
                Storage.tuple(1, Storage.bag(Storage.tuple("hello"), Storage.tuple("world"), Storage.tuple("program"))),
                Storage.tuple(2, Storage.bag(Storage.tuple("my"), Storage.tuple("world"))));

        myPig.setBatchOn();
        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b:bag{(c:chararray)});");
        myPig.registerQuery("A = foreach A generate a, flatten(b);");
        myPig.registerQuery("A1 = foreach A generate a;");
        myPig.registerQuery("A1 = distinct A1;");
        myPig.registerQuery("A2 = filter A by c == 'world';");
        myPig.registerQuery("A2 = ORDER A2 by a parallel 2;");
        myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
        myPig.registerQuery("store A2 into 'output2' using mock.Storage();");

        myPig.executeBatch();

        List<Tuple> actualResults = data.get("output1");
        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {"(1)", "(2)"});
        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);

        actualResults = data.get("output2");
        expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {"(1, 'world')", "(2, 'world')"});
        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
    }

    @Test
    public void testMultiQueryJiraPig4493() throws Exception {

        // Union followed by Split
        Storage.Data data = Storage.resetData(myPig);
        data.set("inputLocation",
                Storage.tuple("1", "Dyson"),
                Storage.tuple("2", "Miele"),
                Storage.tuple("3", "Black & Decker")
                );

        myPig.setBatchOn();
        myPig.registerQuery("A = load 'inputLocation' using mock.Storage();");
        myPig.registerQuery("A = foreach A generate (int)$0 as a, (chararray)$1 as b;");
        myPig.registerQuery("A1 = FILTER A by b matches '.*[a-zA-Z] *& *[a-zA-Z].*';");
        myPig.registerQuery("A1 = FOREACH A1 generate a, REPLACE(b,'&','and')  as b;");
        myPig.registerQuery("A = UNION A1, A;");
        myPig.registerQuery("A = FOREACH A generate a, LOWER(b) as b;");
        myPig.registerQuery("A2 = GROUP A by a;");
        myPig.registerQuery("A2 = FOREACH A2 generate group, COUNT(A) as cnt;");
        myPig.registerQuery("store A2 into 'output1' using mock.Storage();");
        myPig.registerQuery("A = FILTER A BY b is not null and b != '';");
        myPig.registerQuery("store A into 'output2' using mock.Storage();");

        myPig.executeBatch();

        List<Tuple> actualResults = data.get("output1");
        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {"(1, 1L)", "(2, 1L)", "(3, 2L)"});
        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);

        actualResults = data.get("output2");
        expectedResults = Util.getTuplesFromConstantTupleStrings(new String[] {
                "(1, 'dyson')", "(2, 'miele')", "(3, 'black & decker')",
                "(3, 'black and decker')" });
        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
    }

    @Test
    public void testMultiQueryJiraPig4899() throws Exception {
        myPig.setBatchOn();

        myPig.registerQuery("a = load 'passwd' "
                + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
        myPig.registerQuery("b1 = foreach a generate uname;");
        myPig.registerQuery("b2 = foreach a generate uid;");
        myPig.registerQuery("store b1 into 'output1';");
        myPig.registerQuery("store b2 into 'output2';");

        List<ExecJob> jobs = myPig.executeBatch();
        for (ExecJob job : jobs) {
            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
            List<InputStats> stats = job.getStatistics().getInputStats();
            assertEquals(1,stats.size());
            InputStats stat = stats.get(0);
            assertEquals("Number of records in passwd file is 14",14,stat.getNumberRecords());
        }
    }

    @Test
    public void testMultiQueryJiraPig4883() throws Exception {
        Storage.Data data = Storage.resetData(myPig);
        data.set("inputLocation",
                Storage.tuple("c", "12"), Storage.tuple("d", "-12"));
        myPig.setBatchOn();
        myPig.registerQuery("A = load 'inputLocation' using mock.Storage();");
        myPig.registerQuery("A = foreach A generate (chararray)$0 as id, (long)$1 as val;");
        myPig.registerQuery("B = filter A by val > 0;");
        myPig.registerQuery("B1 = group B by val;");
        myPig.registerQuery("B1 = foreach B1 generate group as name, COUNT(B) as value;");
        myPig.registerQuery("B1 = foreach B1 generate (chararray)name,value;");
        myPig.registerQuery("store B1 into 'output1' using mock.Storage();");
        myPig.registerQuery("B2 = group B by id;");
        myPig.registerQuery("B2 = foreach B2 generate group as name, COUNT(B) as value;");
        myPig.registerQuery("store B2 into 'output2' using mock.Storage();");
        myPig.registerQuery("C = filter A by val < 0;");
        myPig.registerQuery("C1 = group C by val;");
        myPig.registerQuery("C1 = foreach C1 generate group as name, COUNT(C) as value;");
        myPig.registerQuery("store C1 into 'output3' using mock.Storage();");
        myPig.registerQuery("C2 = group C by id;");
        myPig.registerQuery("C2 = foreach C2 generate group as name, COUNT(C) as value;");
        myPig.registerQuery("store C2 into 'output4' using mock.Storage();");
        myPig.executeBatch();

        List<Tuple> actualResults = data.get("output1");
        String[] expectedResults = new String[]{"(12, 1)"};
        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan
                .logical.Util.translateSchema(myPig.dumpSchema("B1")), Util.isSparkExecType(Util.getLocalTestMode()));


        actualResults = data.get("output2");
        expectedResults = new String[]{"(c,1)"};
        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
                .translateSchema(myPig.dumpSchema("B2")), Util.isSparkExecType(Util.getLocalTestMode()));

        actualResults = data.get("output3");
        expectedResults = new String[]{"(-12, 1)"};
        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
                .translateSchema(myPig.dumpSchema("C1")), Util.isSparkExecType(Util.getLocalTestMode()));

        actualResults = data.get("output4");
        expectedResults = new String[]{"(d,1)"};
        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
                .translateSchema(myPig.dumpSchema("C2")), Util.isSparkExecType(Util.getLocalTestMode()));
    }

    @Test
    public void testMultiQueryJiraPig4548() throws Exception {
        Storage.Data data = Storage.resetData(myPig);
        data.set("inputLocation",
                Storage.tuple("1", "f"),
                Storage.tuple("2", "f"),
                Storage.tuple("3", "f"),
                Storage.tuple("4", "f"),
                Storage.tuple("5", "f"),
                Storage.tuple("6", "f"));
        myPig.setBatchOn();
        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (f1:chararray, f2:chararray);");
        myPig.registerQuery("SPLIT A into T if (f2 == 'T'), F otherwise;");
        myPig.registerQuery("T2 = group T by f1;");
        myPig.registerQuery("store T2 into 'output1' using mock.Storage();");
        myPig.registerQuery("F2 = group F by f1;");
        myPig.registerQuery("F3 = stream F2 through `" + simpleEchoStreamingCommand
                            + "` as (group:chararray, F:bag{tuple(f1: chararray,f2: chararray)});");
        myPig.registerQuery("store F3 into 'output2' using mock.Storage();");
        myPig.executeBatch();

        List<Tuple> actualResults = data.get("output1");
        assertEquals("Number of records for output1 should be 0",0,actualResults.size());
        actualResults = data.get("output2");
        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                                         new String[]{"('1',{('1','f')})",
                                                      "('2',{('2','f')})",
                                                      "('3',{('3','f')})",
                                                      "('4',{('4','f')})",
                                                      "('5',{('5','f')})",
                                                      "('6',{('6','f')})"});
        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
    }

    // --------------------------------------------------------------------------
    // Helper methods

    private static void deleteOutputFiles() {
        Util.deleteDirectory(new File("output1"));
        Util.deleteDirectory(new File("output2"));
        Util.deleteDirectory(new File("output3"));
        Util.deleteDirectory(new File("output4"));
        Util.deleteDirectory(new File("output5"));
    }
}
