/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.test.utils.Identity;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class TestEvalPipeline2 {

    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
    private PigServer pigServer;

    TupleFactory mTf = TupleFactory.getInstance();
    BagFactory mBf = BagFactory.getInstance();

    @Before
    public void setUp() throws Exception{
        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
    }

    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        cluster.shutDown();
    }

    @Test
    public void testUdfInputOrder() throws IOException {
        String[] input = {
                "(123)",
                "((123)",
                "(123123123123)",
                "(asdf)"
        };

        Util.createInputFile(cluster, "table_udfInp", input);
        pigServer.registerQuery("a = load 'table_udfInp' as (i:int);");
        pigServer.registerQuery("b = foreach a {dec = 'hello'; str1 = " +  Identity.class.getName() +
                    "(dec,'abc','def');" +
                    "generate dec,str1; };");
        Iterator<Tuple> it = pigServer.openIterator("b");

        Tuple tup=null;

        //tuple 1
        tup = it.next();
        Tuple out = (Tuple)tup.get(1);

        Assert.assertEquals( out.get(0).toString(), "hello");
        Assert.assertEquals(out.get(1).toString(), "abc");
        Assert.assertEquals(out.get(2).toString(), "def");

        Util.deleteFile(cluster, "table_udfInp");
    }


    @Test
    public void testUDFwithStarInput() throws Exception {
        int LOOP_COUNT = 10;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigServer
                        .getPigContext()) + "';");
        pigServer.registerQuery("B = group A by $0;");
        String query = "C = foreach B {"
        + "generate " + Identity.class.getName() +"(*);"
        + "};";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        List<Tuple> actualResList = new ArrayList<Tuple>();
        while(iter.hasNext()){
            actualResList.add(iter.next());
        }

        Util.sortQueryOutputsIfNeed(actualResList,Util.isSparkExecType(cluster.getExecType()));

        int numIdentity = 0;
        for (Tuple tuple : actualResList) {
            Tuple t = (Tuple)tuple.get(0);
            Assert.assertEquals(DataByteArray.class, t.get(0).getClass());
            int group = Integer.parseInt(new String(((DataByteArray)t.get(0)).get()));
            Assert.assertEquals(numIdentity, group);
            Assert.assertTrue(t.get(1) instanceof DataBag);
            DataBag bag = (DataBag)t.get(1);
            Assert.assertEquals(10, bag.size());
            Assert.assertEquals(2, t.size());
            ++numIdentity;
        }
        Assert.assertEquals(LOOP_COUNT, numIdentity);

    }
    @Test
    public void testBinStorageByteArrayCastsSimple() throws IOException {
        // Test for PIG-544 fix
        // Tries to read data in BinStorage bytearrays as other pig types,
        // should return null if the conversion fails.
        // This test case does not use a practical example , it just tests
        // if the conversion happens when minimum conditions for conversion
        // such as expected number of bytes are met.
        String[] input = {
                    "asdf\t12\t1.1\t231\t234\t3024123\t3.2492",
                    "sa\t1231\t123.4\t12345678\t1234.567\t5081123453\t9.181817",
                    "asdff\t1232123\t1.45345\t123456789\t123456789.9\t1234567\t1.234567"
                    };

        Util.createInputFile(cluster, "table_bs_ac", input);

        // test with BinStorage
        pigServer.registerQuery("a = load 'table_bs_ac';");
        String output = "/pig/out/TestEvalPipeline2_BinStorageByteArrayCasts";
        pigServer.deleteFile(output);
        pigServer.store("a", output, BinStorage.class.getName());

        pigServer.registerQuery("b = load '" + output + "' using BinStorage('Utf8StorageConverter') "
                + "as (name: int, age: int, gpa: float, lage: long, dgpa: double, bi:biginteger, bd:bigdecimal);");

        Iterator<Tuple> it = pigServer.openIterator("b");

        Tuple tup=null;

        // I have separately verified only few of the successful conversions,
        // assuming the rest are correct.
        // It is primarily testing if null is being returned when conversions
        // are expected to fail

        //tuple 1
        tup = it.next();

        Assert.assertTrue((Integer)tup.get(0) == null);
        Assert.assertTrue((Integer)tup.get(1) == 12);
        Assert.assertTrue((Float)tup.get(2) == 1.1F);
        Assert.assertTrue((Long)tup.get(3) == 231L);
        Assert.assertTrue((Double)tup.get(4) == 234.0);
        Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("3024123"));
        Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("3.2492"));

        //tuple 2
        tup = it.next();
        Assert.assertTrue(tup.get(0) == null);
        Assert.assertTrue((Integer)tup.get(1) == 1231);
        Assert.assertTrue((Float)tup.get(2) == 123.4F);
        Assert.assertTrue((Long)tup.get(3) == 12345678L);
        Assert.assertTrue((Double)tup.get(4) == 1234.567);
        Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("5081123453"));
        Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("9.181817"));

        //tuple 3
        tup = it.next();
        Assert.assertTrue(tup.get(0) == null);
        Assert.assertTrue((Integer)tup.get(1) == 1232123);
        Assert.assertTrue((Float)tup.get(2) == 1.45345F);
        Assert.assertTrue((Long)tup.get(3) == 123456789L);
        Assert.assertTrue((Double)tup.get(4) == 1.234567899E8);
        Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("1234567"));
        Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("1.234567"));

        Util.deleteFile(cluster, "table");
    }
    @Test
    public void testBinStorageByteArrayCastsComplexBag() throws IOException {
        // Test for PIG-544 fix

        // Tries to read data in BinStorage bytearrays as other pig bags,
        // should return null if the conversion fails.

        String[] input = {
                "{(asdf)}",
                "{(2344)}",
                "{(2344}",
                "{(323423423423434)}",
                "{(323423423423434L)}",
                "{(asdff)}"
        };

        Util.createInputFile(cluster, "table_bs_ac_clx", input);

        // test with BinStorage
        pigServer.registerQuery("a = load 'table_bs_ac_clx' as (f1);");
        pigServer.registerQuery("b = foreach a generate (bag{tuple(int)})f1;");

        Iterator<Tuple> it = pigServer.openIterator("b");

        Tuple tup=null;

        //tuple 1
        tup = it.next();
        Assert.assertTrue(tup.get(0) != null);

        //tuple 2
        tup = it.next();
        Assert.assertTrue(tup.get(0) != null);

        //tuple 3 - malformed
        tup = it.next();
        Assert.assertTrue(tup.get(0) == null);

        //tuple 4 - integer exceeds size limit
        tup = it.next();
        Assert.assertTrue(tup.get(0) instanceof DataBag);
        DataBag db = (DataBag)tup.get(0);
        Assert.assertTrue(db.iterator().hasNext());
        Tuple innerTuple = db.iterator().next();
        Assert.assertTrue(innerTuple.get(0)==null);

        //tuple 5
        tup = it.next();
        Assert.assertTrue(tup.get(0) != null);

        //tuple 6
        tup = it.next();
        Assert.assertTrue(tup.get(0) != null);

        Util.deleteFile(cluster, "table_bs_ac_clx");
    }
    @Test
    public void testBinStorageByteArrayCastsComplexTuple() throws IOException {
        // Test for PIG-544 fix

        // Tries to read data in BinStorage bytearrays as other pig bags,
        // should return null if the conversion fails.

        String[] input = {
                "(123)",
                "((123)",
                "(123123123123)",
                "(asdf)"
        };

        Util.createInputFile(cluster, "table_bs_ac_clxt", input);

        // test with BinStorage
        pigServer.registerQuery("a = load 'table_bs_ac_clxt' as (t:tuple(t:tuple(i:int)));");
        Iterator<Tuple> it = pigServer.openIterator("a");

        Tuple tup=null;

        //tuple 1
        tup = it.next();
        Assert.assertTrue(tup.get(0) == null);

        //tuple 2 -malformed tuple
        tup = it.next();
        Assert.assertTrue(tup.get(0) == null);

        //tuple 3 - integer exceeds size limit
        tup = it.next();
        Assert.assertTrue(tup.get(0) == null);

        //tuple 4
        tup = it.next();
        Assert.assertTrue(tup.get(0) == null);

        Util.deleteFile(cluster, "table_bs_ac_clxt");
    }

    @Test
    public void testPigStorageWithCtrlChars() throws Exception {
        String[] inputData = { "hello\u0001world", "good\u0001morning", "nice\u0001day" };
        Util.createInputFile(cluster, "testPigStorageWithCtrlCharsInput.txt", inputData);
        String script = "a = load 'testPigStorageWithCtrlCharsInput.txt' using PigStorage('\u0001');" +
        		"b = foreach a generate $0, CONCAT($0, '\u0005'), $1; " +
        		"store b into 'testPigStorageWithCtrlCharsOutput.txt' using PigStorage('\u0001');" +
        		"c = load 'testPigStorageWithCtrlCharsOutput.txt' using PigStorage('\u0001') as (f1:chararray, f2:chararray, f3:chararray);";
        Util.registerMultiLineQuery(pigServer, script);
        Iterator<Tuple> it  = pigServer.openIterator("c");
        HashMap<String, Tuple> expectedResults = new HashMap<String, Tuple>();
        expectedResults.put("hello", (Tuple) Util.getPigConstant("('hello','hello\u0005','world')"));
        expectedResults.put("good", (Tuple) Util.getPigConstant("('good','good\u0005','morning')"));
        expectedResults.put("nice", (Tuple) Util.getPigConstant("('nice','nice\u0005','day')"));
        HashMap<String, Boolean> seen = new HashMap<String, Boolean>();
        int numRows = 0;
        while(it.hasNext()) {
            Tuple t = it.next();
            String firstCol = (String) t.get(0);
            Assert.assertFalse(seen.containsKey(firstCol));
            seen.put(firstCol, true);
            Assert.assertEquals(expectedResults.get(firstCol), t);
            numRows++;
        }
        Assert.assertEquals(3, numRows);
        Util.deleteFile(cluster, "testPigStorageWithCtrlCharsInput.txt");
    }

    @Test
    // Test case added for PIG-850
    public void testLimitedSortWithDump() throws Exception{
        int LOOP_COUNT = 40;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        Random r = new Random(1);
        int rand;
        for(int i = 0; i < LOOP_COUNT; i++) {
            rand = r.nextInt(100);
            ps.println(rand);
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigServer
                        .getPigContext()) + "' AS (num:int);");
        pigServer.registerQuery("B = order A by num parallel 2;");
        pigServer.registerQuery("C = limit B 10;");
        Iterator<Tuple> result = pigServer.openIterator("C");

        int numIdentity = 0;
        while (result.hasNext())
        {
            result.next();
            ++numIdentity;
        }
        Assert.assertEquals(10, numIdentity);
    }

    @Test
    public void testLimitAfterSort() throws Exception{
        int LOOP_COUNT = 40;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        Random r = new Random(1);
        int rand;
        for(int i = 0; i < LOOP_COUNT; i++) {
            rand = r.nextInt(100);
            ps.println(rand);
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigServer
                        .getPigContext()) + "' AS (num:int);");
        pigServer.registerQuery("B = order A by num parallel 2;");
        pigServer.registerQuery("C = limit B 10;");
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numIdentity = 0;
        int oldNum = Integer.MIN_VALUE;
        int newNum;
        while(iter.hasNext()){
            Tuple t = iter.next();
            newNum = (Integer)t.get(0);
            Assert.assertTrue(newNum>=oldNum);
            oldNum = newNum;
            ++numIdentity;
        }
        Assert.assertEquals(10, numIdentity);
    }

    @Test
    public void testLimitAfterSortDesc() throws Exception{
        int LOOP_COUNT = 40;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        Random r = new Random(1);
        int rand;
        for(int i = 0; i < LOOP_COUNT; i++) {
            rand = r.nextInt(100);
            ps.println(rand);
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigServer
                        .getPigContext()) + "' AS (num:int);");
        pigServer.registerQuery("B = order A by num desc parallel 2;");
        pigServer.registerQuery("C = limit B 10;");
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numIdentity = 0;
        int oldNum = Integer.MAX_VALUE;
        int newNum;
        while(iter.hasNext()){
            Tuple t = iter.next();
            newNum = (Integer)t.get(0);
            Assert.assertTrue(newNum<=oldNum);
            oldNum = newNum;
            ++numIdentity;
        }
        Assert.assertEquals(10, numIdentity);
    }

    @Test
    // See PIG-894
    public void testEmptySort() throws Exception{
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigServer
                        .getPigContext()) + "';");
        pigServer.registerQuery("B = order A by $0;");
        Iterator<Tuple> iter = pigServer.openIterator("B");

        Assert.assertTrue(iter.hasNext()==false);
    }

    // See PIG-761
    @Test
    public void testLimitPOPackageAnnotator() throws Exception{
        File tmpFile1 = Util.createTempFileDelOnExit("test1", "txt");
        PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
        ps1.println("1\t2\t3");
        ps1.println("2\t5\t2");
        ps1.close();

        File tmpFile2 = Util.createTempFileDelOnExit("test2", "txt");
        PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
        ps2.println("1\t1");
        ps2.println("2\t2");
        ps2.close();

        pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
        pigServer.registerQuery("B = LOAD '" + Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
        pigServer.registerQuery("C = LIMIT B 100;");
        pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
        Iterator<Tuple> iter = pigServer.openIterator("D");

        if (Util.isSparkExecType(cluster.getExecType())) {
            String[] expectedResults =
                new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})" };
            Util.checkQueryOutputs(iter, expectedResults,
                org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), Util.isSparkExecType(cluster.getExecType()));
        } else {
            Assert.assertTrue(iter.hasNext());
            Tuple t = iter.next();

            Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));

            Assert.assertTrue(iter.hasNext());
            t = iter.next();

            Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));

            Assert.assertFalse(iter.hasNext());
        }
    }

    // See PIG-1195
    @Test
    public void testNestedDescSort() throws Exception{
        Util.createInputFile(cluster, "table_testNestedDescSort", new String[]{"3","4"});
        pigServer.registerQuery("A = LOAD 'table_testNestedDescSort' as (a0:int);");
        pigServer.registerQuery("B = group A ALL;");
        pigServer.registerQuery("C = foreach B { D = order A by a0 desc;generate D;};");
        Iterator<Tuple> iter = pigServer.openIterator("C");

        Assert.assertTrue(iter.hasNext());
        Tuple t = iter.next();

        Assert.assertTrue(t.toString().equals("({(4),(3)})"));
        Assert.assertFalse(iter.hasNext());

        Util.deleteFile(cluster, "table_testNestedDescSort");
    }

    // See PIG-972
    @Test
    public void testDescribeNestedAlias() throws Exception{
        String[] input = {
                "1\t3",
                "2\t4",
                "3\t5"
        };

        Util.createInputFile(cluster, "table_testDescribeNestedAlias", input);
        pigServer.registerQuery("A = LOAD 'table_testDescribeNestedAlias' as (a0, a1);");
        pigServer.registerQuery("P = GROUP A by a1;");
        // Test RelationalOperator
        pigServer.registerQuery("B = FOREACH P { D = ORDER A by $0; generate group, D.$0; };");

        // Test ExpressionOperator - negative test case
        pigServer.registerQuery("C = FOREACH A { D = a0/a1; E=a1/a0; generate E as newcol; };");
        Schema schema = pigServer.dumpSchemaNested("B", "D");
        Assert.assertTrue(schema.toString().equalsIgnoreCase("{a0: bytearray,a1: bytearray}"));
        try {
            schema = pigServer.dumpSchemaNested("C", "E");
        } catch (FrontendException e) {
            Assert.assertTrue(e.getErrorCode() == 1113);
        }
    }

    // See PIG-1484
    @Test
    public void testBinStorageCommaSeperatedPath() throws Exception{
        String[] input = {
                "1\t3",
                "2\t4",
                "3\t5"
        };

        Util.createInputFile(cluster, "table_simple1", input);
        pigServer.setBatchOn();
        pigServer.registerQuery("A = LOAD 'table_simple1' as (a0, a1);");
        pigServer.registerQuery("store A into 'table_simple1.bin' using BinStorage();");
        pigServer.registerQuery("store A into 'table_simple2.bin' using BinStorage();");

        pigServer.executeBatch();

        pigServer.registerQuery("A = LOAD 'table_simple1.bin,table_simple2.bin' using BinStorage();");
        Iterator<Tuple> iter = pigServer.openIterator("A");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(1,3)"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(2,4)"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(3,5)"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(1,3)"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(2,4)"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(3,5)"));

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1543
    @Test
    public void testEmptyBagIterator() throws Exception{
        String[] input1 = {
                "1",
                "1",
                "1"
        };

        String[] input2 = {
                "2",
                "2"
        };

        Util.createInputFile(cluster, "input1", input1);
        Util.createInputFile(cluster, "input2", input2);
        pigServer.registerQuery("A = load 'input1' as (a1:int);");
        pigServer.registerQuery("B = load 'input2' as (b1:int);");
        pigServer.registerQuery("C = COGROUP A by a1, B by b1;");
        pigServer.registerQuery("C1 = foreach C { Alim = limit A 1; Blim = limit B 1; generate Alim, Blim; };");
        pigServer.registerQuery("D1 = FOREACH C1 generate Alim,Blim, (IsEmpty(Alim)? 0:1), (IsEmpty(Blim)? 0:1), COUNT(Alim), COUNT(Blim);");

        Iterator<Tuple> iter = pigServer.openIterator("D1");

        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {
                        "({(1)},{},1,0,1L,0L)",
                        "({},{(2)},0,1,0L,1L)" });
        Util.checkQueryOutputsAfterSort(iter, expectedResults);
    }

    // See PIG-1669
    @Test
    public void testPushUpFilterScalar() throws Exception{
        String[] input1 = {
                "jason\t14\t4.7",
                "jack\t18\t4.6"
        };

        String[] input2 = {
                "jason\t14",
                "jack\t18"
        };

        Util.createInputFile(cluster, "table_PushUpFilterScalar1", input1);
        Util.createInputFile(cluster, "table_PushUpFilterScalar2", input2);
        pigServer.registerQuery("A = load 'table_PushUpFilterScalar1' as (name, age, gpa);");
        pigServer.registerQuery("B = load 'table_PushUpFilterScalar2' as (name, age);");
        pigServer.registerQuery("C = filter A by age < 20;");
        pigServer.registerQuery("D = filter B by age < 20;");
        pigServer.registerQuery("simple_scalar = limit D 1;");
        pigServer.registerQuery("E = join C by name, D by name;");
        pigServer.registerQuery("F = filter E by C::age==(int)simple_scalar.age;");

        Iterator<Tuple> iter = pigServer.openIterator("F");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(jason,14,4.7,jason,14)"));

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1683
    @Test
    public void testDuplicateReferenceInnerPlan() throws Exception{
        String[] input1 = {
                "1\t1\t1",
        };

        String[] input2 = {
                "1\t1",
                "2\t2"
        };

        Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan1", input1);
        Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan2", input2);
        pigServer.registerQuery("a = load 'table_testDuplicateReferenceInnerPlan1' as (a0, a1, a2);");
        pigServer.registerQuery("b = load 'table_testDuplicateReferenceInnerPlan2' as (b0, b1);");
        pigServer.registerQuery("c = join a by a0, b by b0;");
        pigServer.registerQuery("d = foreach c {d0 = a::a1;d1 = a::a2;generate ((d0 is not null)? d0 : d1);};");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(1)"));

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1719
    @Test
    public void testBinCondSchema() throws IOException {
        String[] inputData = new String[] {"hello world\t2"};
        Util.createInputFile(cluster, "table_testSchemaSerialization.txt", inputData);
        pigServer.registerQuery("a = load 'table_testSchemaSerialization.txt' as (a0:chararray, a1:int);");
        pigServer.registerQuery("b = foreach a generate FLATTEN((a1<=1?{('null')}:TOKENIZE(a0)));");
        pigServer.registerQuery("c = foreach b generate UPPER($0);");

        Iterator<Tuple> it = pigServer.openIterator("c");
        Tuple t = it.next();
        Assert.assertTrue(t.get(0).equals("HELLO"));
        t = it.next();
        Assert.assertTrue(t.get(0).equals("WORLD"));
    }

    // See PIG-1721
    @Test
    public void testDuplicateInnerAlias() throws Exception{
        String[] input1 = {
                "1\t[key1#5]", "1\t[key2#5]", "2\t[key1#3]"
        };

        Util.createInputFile(cluster, "table_testDuplicateInnerAlias", input1);
        pigServer.registerQuery("a = load 'table_testDuplicateInnerAlias' as (a0:int, a1:map[]);");
        pigServer.registerQuery("b = filter a by a0==1;");
        pigServer.registerQuery("c = foreach b { b0 = a1#'key1'; generate ((b0 is null or b0 == '')?1:0);};");

        Iterator<Tuple> iter = pigServer.openIterator("c");

        Tuple t = iter.next();
        Assert.assertTrue((Integer)t.get(0)==0);

        t = iter.next();
        Assert.assertTrue((Integer)t.get(0)==1);

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-3379
    @Test
    public void testNestedOperatorReuse() throws Exception{
        String[] input1 = {
        		"60000\tdv1\txuaHeartBeat",
        		"70000\tdv2\txuaHeartBeat",
        		"80000\tdv1\txuaPowerOff",
        		"90000\tdv1\txuaPowerOn",
        		"110000\tdv2\txuaHeartBeat",
        		"120000\tdv2\txuaPowerOff",
        		"140000\tdv2\txuaPowerOn",
        		"150000\tdv1\txuaHeartBeat",
        		"160000\tdv2\txuaHeartBeat",
        		"250000\tdv1\txuaHeartBeat",
        		"310000\tdv2\txuaPowerOff",
        		"360000\tdv1\txuaPowerOn",
        		"420000\tdv3\txuaHeartBeat",
        		"450000\tdv3\txuaHeartBeat",
        		"540000\tdv4\txuaPowerOn",
        		"550000\tdv3\txuaHeartBeat",
        		"560000\tdv5\txuaHeartBeat" };
        Util.createInputFile( cluster, "table_testNestedOperatorReuse", input1 );
        String query = "Events = LOAD 'table_testNestedOperatorReuse' AS (eventTime:long, deviceId:chararray, eventName:chararray);" +
        		"Events = FOREACH Events GENERATE eventTime, deviceId, eventName;" +
        		"EventsPerMinute = GROUP Events BY (eventTime / 60000);" +
        		"EventsPerMinute = FOREACH EventsPerMinute {" +
        		"  DistinctDevices = DISTINCT Events.deviceId;" +
        		"  nbDevices = SIZE(DistinctDevices);" +
        		"  DistinctDevices = FILTER Events BY eventName == 'xuaHeartBeat';" +
        		"  nbDevicesWatching = SIZE(DistinctDevices);" +
        		"  GENERATE $0*60000 as timeStamp, nbDevices as nbDevices, nbDevicesWatching as nbDevicesWatching;" +
        		"}" +
        		"EventsPerMinute = FILTER EventsPerMinute BY timeStamp >= 0  AND timeStamp < 300000;";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("EventsPerMinute");

        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
            new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)", "(240000L,1L,1L)"});

        Util.checkQueryOutputs(iter, expectedResults, Util.isSparkExecType(cluster.getExecType()));
    }

    // See PIG-1729
    @Test
    public void testDereferenceInnerPlan() throws Exception{
        String[] input1 = {
                "1\t2\t3"
        };

        String[] input2 = {
                "1\t1"
        };

        Util.createInputFile(cluster, "table_testDereferenceInnerPlan1", input1);
        Util.createInputFile(cluster, "table_testDereferenceInnerPlan2", input2);
        pigServer.registerQuery("a = load 'table_testDereferenceInnerPlan1' as (a0:int, a1:int, a2:int);");
        pigServer.registerQuery("b = load 'table_testDereferenceInnerPlan2' as (b0:int, b1:int);");
        pigServer.registerQuery("c = cogroup a by a0, b by b0;");
        pigServer.registerQuery("d = foreach c generate ((COUNT(a)==0L)?null : a.a0) as d0;");
        pigServer.registerQuery("e = foreach d generate flatten(d0);");
        pigServer.registerQuery("f = group e all;");

        Iterator<Tuple> iter = pigServer.openIterator("f");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(all,{(1)})"));

        Assert.assertFalse(iter.hasNext());
    }

    @Test
    // See PIG-1725
    public void testLOGenerateSchema() throws Exception{
        String[] input1 = {
                "1\t2\t{(1)}",
        };

        Util.createInputFile(cluster, "table_testLOGenerateSchema", input1);
        pigServer.registerQuery("a = load 'table_testLOGenerateSchema' as (a0:int, a1, a2:bag{});");
        pigServer.registerQuery("b = foreach a generate a0 as b0, a1 as b1, flatten(a2) as b2:int;");
        pigServer.registerQuery("c = filter b by b0==1;");
        pigServer.registerQuery("d = foreach c generate b0+1, b2;");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(2,1)"));

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1737
    @Test
    public void testMergeSchemaErrorMessage() throws IOException {
        try {
            pigServer.registerQuery("a = load '1.txt' as (a0, a1, a2);");
            pigServer.registerQuery("b = group a by (a0, a1);");
            pigServer.registerQuery("c = foreach b generate flatten(group) as c0;");
            pigServer.openIterator("c");
        } catch (Exception e) {
            PigException pe = LogUtils.getPigException(e);
            Util.checkStrContainsSubStr(pe.getMessage(), "Incompatible schema");
            return;
        }
        Assert.fail();
    }

    // See PIG-1732
    @Test
    public void testForEachDupColumn() throws Exception{
        String[] input1 = {
                "1\t2",
        };

        String[] input2 = {
                "1\t1\t3",
                "2\t4\t2"
        };

        Util.createInputFile(cluster, "table_testForEachDupColumn1", input1);
        Util.createInputFile(cluster, "table_testForEachDupColumn2", input2);
        pigServer.registerQuery("a = load 'table_testForEachDupColumn1' as (a0, a1:int);");
        pigServer.registerQuery("b = load 'table_testForEachDupColumn2' as (b0, b1:int, b2);");
        pigServer.registerQuery("c = foreach a generate a0, a1, a1 as a2;");
        pigServer.registerQuery("d = union b, c;");
        pigServer.registerQuery("e = foreach d generate $1;");

        Iterator<Tuple> iter = pigServer.openIterator("e");

        Map<Object, Object> expected = new HashMap<Object, Object>(3);
        expected.put(1, null);
        expected.put(2, null);
        expected.put(4, null);

        Tuple t = iter.next();
        Assert.assertTrue(t.size()==1);
        Assert.assertTrue(expected.containsKey(t.get(0)));

        t = iter.next();
        Assert.assertTrue(t.size()==1);
        Assert.assertTrue(expected.containsKey(t.get(0)));

        t = iter.next();
        Assert.assertTrue(t.size()==1);
        Assert.assertTrue(expected.containsKey(t.get(0)));

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1745
    @Test
    public void testBinStorageByteCast() throws Exception{
        String[] input1 = {
                "1\t2\t3",
        };

        Util.createInputFile(cluster, "table_testBinStorageByteCast", input1);
        pigServer.registerQuery("a = load 'table_testBinStorageByteCast' as (a0, a1, a2);");
        pigServer.store("a", "table_testBinStorageByteCast.temp", BinStorage.class.getName());

        pigServer.registerQuery("a = load 'table_testBinStorageByteCast.temp' using BinStorage() as (a0, a1, a2);");
        pigServer.registerQuery("b = foreach a generate (long)a0;");

        try {
            pigServer.openIterator("b");
        } catch (Exception e) {
            PigException pe = LogUtils.getPigException(e);
            //This changes in hadoop 23, we get error code 2997
            //Assert.assertTrue(pe.getErrorCode()==1118);
            return;
        }

        Assert.fail();
    }

    // See PIG-1761
    @Test
    public void testBagDereferenceInMiddle1() throws Exception{
        String[] input1 = {
                "foo@apache#44",
        };

        Util.createInputFile(cluster, "table_testBagDereferenceInMiddle1", input1);
        pigServer.registerQuery("a = load 'table_testBagDereferenceInMiddle1' as (a0:chararray);");
        pigServer.registerQuery("b = foreach a generate UPPER(REGEX_EXTRACT_ALL(a0, '.*@(.*)#.*').$0);");

        Iterator<Tuple> iter = pigServer.openIterator("b");
        Tuple t = iter.next();
        Assert.assertTrue(t.size()==1);
        Assert.assertTrue(t.get(0).equals("APACHE"));
    }

    // See PIG-1843
    @Test
    public void testBagDereferenceInMiddle2() throws Exception{
        String[] input1 = {
                "foo apache",
        };

        Util.createInputFile(cluster, "table_testBagDereferenceInMiddle2", input1);
        pigServer.registerQuery("a = load 'table_testBagDereferenceInMiddle2' as (a0:chararray);");
        pigServer.registerQuery("b = foreach a generate " + MapGenerate.class.getName() + " (STRSPLIT(a0).$0);");

        Iterator<Tuple> iter = pigServer.openIterator("b");
        Tuple t = iter.next();
        Assert.assertTrue(t.size()==1);
        Assert.assertTrue(t.toString().equals("([key#1])"));
    }

    // See PIG-1766
    @Test
    public void testForEachSameOriginColumn1() throws Exception {
        String[] input1 = {
                "1\t2",
                "1\t3",
                "2\t4",
                "2\t5",
        };

        String[] input2 = {
                "1\tone",
                "2\ttwo",
        };

        Util.createInputFile(cluster, "table_testForEachSameOriginColumn1_1", input1);
        Util.createInputFile(cluster, "table_testForEachSameOriginColumn1_2", input2);
        pigServer.registerQuery("A = load 'table_testForEachSameOriginColumn1_1' AS (a0:int, a1:int);");
        pigServer.registerQuery("B = load 'table_testForEachSameOriginColumn1_2' AS (b0:int, b1:chararray);");
        pigServer.registerQuery("C = join A by a0, B by b0;");
        pigServer.registerQuery("D = foreach B generate b0 as d0, b1 as d1;");
        pigServer.registerQuery("E = join C by a1, D by d0;");
        pigServer.registerQuery("F = foreach E generate b1, d1;");

        Iterator<Tuple> iter = pigServer.openIterator("F");
        Tuple t = iter.next();
        Assert.assertTrue(t.size()==2);
        Assert.assertTrue(t.get(0).equals("one"));
        Assert.assertTrue(t.get(1).equals("two"));
    }

    // See PIG-1771
    @Test
    public void testLoadWithDifferentSchema() throws Exception{
        String[] input1 = {
                "hello\thello\t(hello)\t[key#value]",
        };

        Util.createInputFile(cluster, "table_testLoadWithDifferentSchema1", input1);
        pigServer.registerQuery("a = load 'table_testLoadWithDifferentSchema1' as (a0:chararray, a1:chararray, a2, a3:map[]);");
        pigServer.store("a", "table_testLoadWithDifferentSchema1.bin", "org.apache.pig.builtin.BinStorage");

        pigServer.registerQuery("b = load 'table_testLoadWithDifferentSchema1.bin' USING BinStorage('Utf8StorageConverter') AS (b0:chararray, b1:chararray, b2:tuple(), b3:map[]);");
        Iterator<Tuple> iter = pigServer.openIterator("b");

        Tuple t = iter.next();
        Assert.assertTrue(t.size()==4);
        Assert.assertTrue(t.toString().equals("(hello,hello,(hello),[key#value])"));
    }

    static public class MapGenerate extends EvalFunc<Map<String, Integer>> {
        @Override
        public Map<String, Integer> exec(Tuple input) throws IOException {
            Map<String, Integer> m = new HashMap<String, Integer>();
            m.put("key", new Integer(input.size()));
            return m;
        }

        @Override
        public Schema outputSchema(Schema input) {
            return new Schema(new Schema.FieldSchema(getSchemaName("parselong", input), DataType.MAP));
        }
    }

    // See PIG-1277
    @Test
    public void testWrappingUnknownKey1() throws Exception{
        String[] input1 = {
                "1",
        };

        Util.createInputFile(cluster, "table_testWrappingUnknownKey1", input1);

        pigServer.registerQuery("a = load 'table_testWrappingUnknownKey1' as (a0);");
        pigServer.registerQuery("b = foreach a generate a0, "+ MapGenerate.class.getName() + "(*) as m:map[];");
        pigServer.registerQuery("c = foreach b generate a0, m#'key' as key;");
        pigServer.registerQuery("d = group c by key;");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(t.size()==2);
        Assert.assertTrue(t.toString().equals("(1,{(1,1)})"));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-999
    @Test
    public void testWrappingUnknownKey2() throws Exception{
        String[] input1 = {
                "1",
        };

        Util.createInputFile(cluster, "table_testWrappingUnknownKey2", input1);

        pigServer.registerQuery("a = load 'table_testWrappingUnknownKey2' as (a0);");
        pigServer.registerQuery("b = foreach a generate a0, "+ MapGenerate.class.getName() + "(*) as m:map[];");
        pigServer.registerQuery("c = foreach b generate a0, m#'key' as key;");
        pigServer.registerQuery("d = order c by key;");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(t.size()==2);
        Assert.assertTrue(t.toString().equals("(1,1)"));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1065
    @Test
    public void testWrappingUnknownKey3() throws Exception{
        String[] input1 = {
                "1\t2",
                "2\t3"
        };

        String[] input2 = {
                "1",
        };

        Util.createInputFile(cluster, "table_testWrappingUnknownKey3_1", input1);
        Util.createInputFile(cluster, "table_testWrappingUnknownKey3_2", input2);

        pigServer.registerQuery("a = load 'table_testWrappingUnknownKey3_1' as (a0:chararray, a1:chararray);");
        pigServer.registerQuery("b = load 'table_testWrappingUnknownKey3_2' as (b0:chararray);");
        pigServer.registerQuery("c = union a, b;");
        pigServer.registerQuery("d = order c by $0;");

        Collection<String> results = new HashSet<String>();
        results.add("(1,2)");
        results.add("(1)");
        results.add("(2,3)");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(results.contains(t.toString()));
        t = iter.next();
        Assert.assertTrue(results.contains(t.toString()));
        t = iter.next();
        Assert.assertTrue(results.contains(t.toString()));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1787
    @Test
    public void testOrderByLimitJoin() throws Exception{
        String[] input1 = {
                "1\t1",
                "1\t2"
        };

        Util.createInputFile(cluster, "table_testOrderByLimitJoin", input1);

        pigServer.registerQuery("a = load 'table_testOrderByLimitJoin' as (a0, a1);");
        pigServer.registerQuery("b = group a by a0;");
        pigServer.registerQuery("c = foreach b generate group as c0, COUNT(a) as c1;");
        pigServer.registerQuery("d = order c by c1 parallel 2;");
        pigServer.registerQuery("e = limit d 10;");
        pigServer.registerQuery("f = join e by c0, a by a0;");

        Iterator<Tuple> iter = pigServer.openIterator("f");

        String[] expected = new String[] {"(1,2,1,1)", "(1,2,1,2)"};

        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("f")));

    }

    // See PIG-1785
    @Test
    public void testForEachSameOriginColumn2() throws Exception{
        String[] input1 = {
                "{(1,2),(2,3)}",
        };

        Util.createInputFile(cluster, "table_testForEachSameOriginColumn2", input1);

        pigServer.registerQuery("a = load 'table_testForEachSameOriginColumn2' as (a0:bag{t:tuple(i0:int, i1:int)});");
        pigServer.registerQuery("b = foreach a generate flatten(a0) as (b0, b1), flatten(a0) as (b2, b3);");
        pigServer.registerQuery("c = filter b by b0>b2;");

        Iterator<Tuple> iter = pigServer.openIterator("c");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().contains("(2,3,1,2)"));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1785
    @Test
    public void testForEachSameOriginColumn3() throws Exception{
        String[] input1 = {
                "1\t1\t2",
                "1\t2\t3",
        };

        Util.createInputFile(cluster, "table_testForEachSameOriginColumn3", input1);

        pigServer.registerQuery("a = load 'table_testForEachSameOriginColumn3' as (a0:int, a1:int, a2:int);");
        pigServer.registerQuery("b = group a by a0;");
        pigServer.registerQuery("c = foreach b generate flatten(a.(a1,a2)) as (b0, b1), flatten(a.(a1,a2)) as (b2, b3);");
        pigServer.registerQuery("d = filter c by b0>b2;");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().contains("(2,3,1,2)"));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1785
    @Test
    public void testAddingTwoBag() {
        try {
            pigServer.registerQuery("a = load '1.txt' as (name:chararray, age:int, gpa:double);");
            pigServer.registerQuery("b = group a by name;");
            pigServer.registerQuery("c = foreach b generate group, SUM(a.age*a.gpa);");
            pigServer.openIterator("c");
        } catch (Exception e) {
            PigException pe = LogUtils.getPigException(e);
            Assert.assertTrue(pe.getErrorCode()==1039);
            Assert.assertTrue(pe.getMessage().contains("incompatible types"));
            return;
        }
        Assert.fail();
    }

    public static class BagGenerateNoSchema extends EvalFunc<DataBag> {
        @Override
        public DataBag exec(Tuple input) throws IOException {
            DataBag bg = DefaultBagFactory.getInstance().newDefaultBag();
            bg.add(input);
            return bg;
        }
    }

    // See PIG-1813
    @Test
    public void testUDFNoSchemaPropagate1() throws Exception{
        String[] input1 = {
                "[key#1,key2#2]",
                "[key#2,key2#3]",
        };

        Util.createInputFile(cluster, "table_testUDFNoSchemaPropagate1", input1);

        pigServer.registerQuery("a = load 'table_testUDFNoSchemaPropagate1' as (a0:map[]);");
        pigServer.registerQuery("b = foreach a generate " + BagGenerateNoSchema.class.getName() + "(*) as b0;");
        pigServer.registerQuery("c = foreach b generate flatten(IdentityColumn(b0));");
        pigServer.registerQuery("d = foreach c generate $0#'key';");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().contains("(1)"));
        t = iter.next();
        Assert.assertTrue(t.toString().contains("(2)"));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1813
    @Test
    public void testUDFNoSchemaPropagate2() throws Exception{
        String[] input1 = {
                "[key#1,key2#2]",
                "[key#2,key2#3]",
        };

        Util.createInputFile(cluster, "table_testUDFNoSchemaPropagate2", input1);

        pigServer.registerQuery("a = load 'table_testUDFNoSchemaPropagate2' as (a0:map[]);");
        pigServer.registerQuery("b = foreach a generate flatten(" + BagGenerateNoSchema.class.getName() + "(*)) as b0;");
        pigServer.registerQuery("c = foreach b generate IdentityColumn(b0);");
        pigServer.registerQuery("d = foreach c generate $0#'key';");

        Iterator<Tuple> iter = pigServer.openIterator("d");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().contains("(1)"));
        t = iter.next();
        Assert.assertTrue(t.toString().contains("(2)"));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1812
    @Test
    public void testLocalRearrangeInReducer() throws Exception{
        String[] input1 = {
                "1\t1",
                "1\t1",
                "1\t2",
        };

        String[] input2 = {
                "1\t1",
        };

        Util.createInputFile(cluster, "table_testLocalRearrangeInReducer1", input1);
        Util.createInputFile(cluster, "table_testLocalRearrangeInReducer2", input2);

        pigServer.registerQuery("a = load 'table_testLocalRearrangeInReducer1' as (a0, a1);");
        pigServer.registerQuery("b = distinct a;");
        pigServer.registerQuery("c = load 'table_testLocalRearrangeInReducer2' as (c0, c1);");
        pigServer.registerQuery("d = cogroup b by a0, c by c0;");
        pigServer.registerQuery("e = foreach d { e1 = order c by c1; generate e1;};");

        Iterator<Tuple> iter = pigServer.openIterator("e");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().contains("({(1,1)})"));
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1850
    @Test
    public void testProjectNullSchema() throws Exception{
        String[] input = {
                "0\t1",
        };

        Util.createInputFile(cluster, "table_testProjectNullSchema", input);

        pigServer.registerQuery("a = load 'table_testProjectNullSchema';");
        pigServer.registerQuery("b = foreach a generate ASIN($0), $1;");
        pigServer.registerQuery("c = order b by $0;");

        Iterator<Tuple> iter = pigServer.openIterator("c");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().contains("(0.0,1)"));
        Assert.assertFalse(iter.hasNext());
    }


    // See PIG-1188
    @Test
    public void testSchemaDataNotMatch() throws Exception{
        String[] input = {
                "0\t1\t2",
                "3\t4",
                "5"
        };

        Util.createInputFile(cluster, "table_testSchemaDataNotMatch", input);

        pigServer.registerQuery("a = load 'table_testSchemaDataNotMatch' as (a0, a1);");

        Iterator<Tuple> iter = pigServer.openIterator("a");

        Tuple t = iter.next();
        Assert.assertTrue(t.size()==2);
        Assert.assertTrue(t.get(0).toString().equals("0"));
        Assert.assertTrue(t.get(1).toString().equals("1"));

        t = iter.next();
        Assert.assertTrue(t.size()==2);
        Assert.assertTrue(t.get(0).toString().equals("3"));
        Assert.assertTrue(t.get(1).toString().equals("4"));

        t = iter.next();
        Assert.assertTrue(t.size()==2);
        Assert.assertTrue(t.get(0).toString().equals("5"));
        Assert.assertTrue(t.get(1)==null);

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1912
    @Test
    public void testDuplicateLoadFuncSignature() throws Exception{
        String[] input = {
                "0\t1\ta",
        };

        Util.createInputFile(cluster, "table_testDuplicateLoadFuncSignature", input);
        pigServer.setBatchOn();
        pigServer.registerQuery("a = load 'table_testDuplicateLoadFuncSignature' as (a0, a1, a2);");
        pigServer.registerQuery("b = foreach a generate a0, a1;");
        pigServer.registerQuery("a = load 'table_testDuplicateLoadFuncSignature' as (a0, a1, a2);");
        pigServer.registerQuery("c = foreach a generate a0, a2;");
        pigServer.registerQuery("store b into 'testDuplicateLoadFuncSignatureOutput1';");
        pigServer.registerQuery("store c into 'testDuplicateLoadFuncSignatureOutput2';");

        pigServer.executeBatch();

        pigServer.registerQuery("a = load 'testDuplicateLoadFuncSignatureOutput1';");
        Iterator<Tuple> iter = pigServer.openIterator("a");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(0,1)"));
        Assert.assertFalse(iter.hasNext());

        pigServer.registerQuery("a = load 'testDuplicateLoadFuncSignatureOutput2';");
        iter = pigServer.openIterator("a");

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(0,a)"));
        Assert.assertFalse(iter.hasNext());

    }

    // See PIG-1927
    @Test
    public void testDereferencePartialAlias() throws Exception{

        pigServer.registerQuery("a = load '1.txt' as (a0:int, a1);");
        pigServer.registerQuery("b = group a by a0;");
        pigServer.registerQuery("c = foreach b generate flatten(a);");
        pigServer.registerQuery("d = cogroup c by (a0);");
        pigServer.registerQuery("e = foreach d generate c.a0 as e0;");
        pigServer.registerQuery("f = foreach e generate e0;");

        // Shall not throw exception
        pigServer.explain("f", System.out);
    }

    // See PIG-1866
    @Test
    public void testProjBagInTuple() throws Exception{
        String[] input = {
                "(1,{(one),(two)})",
        };

        Util.createInputFile(cluster, "table_testProjBagInTuple", input);

        pigServer.registerQuery("a = load 'table_testProjBagInTuple' as (t : tuple(i: int, b1: bag { b_tuple : tuple ( b_str: chararray) }));");
        pigServer.registerQuery("b = foreach a generate t.b1;");

        Iterator<Tuple> iter = pigServer.openIterator("b");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("({(one),(two)})"));
        Assert.assertFalse(iter.hasNext());
    }

    //See PIG-1974
    @Test
    public void testCastMap() throws Exception{
        String[] input = {
                "([key#1])",
                "([key#2])",
        };

        Util.createInputFile(cluster, "table_testCastMap", input);

        pigServer.registerQuery("a = load 'table_testCastMap' as (m:map[]);");
        pigServer.registerQuery("b = foreach a generate (map[int])m;");
        pigServer.registerQuery("c = foreach b generate m#'key' + 1;");

        Iterator<Tuple> iter = pigServer.openIterator("c");

        Tuple t = iter.next();
        Assert.assertEquals(t.get(0), 2);
        t = iter.next();
        Assert.assertEquals(t.get(0), 3);
        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-1979
    @Test
    public void testDereferenceUidBug() throws Exception{
        String[] input1 = {
                "0\t0\t{(1,2)}\t1",
        };
        String[] input2 = {
                "0\t0",
        };

        Util.createInputFile(cluster, "table_testDereferenceUidBug1", input1);
        Util.createInputFile(cluster, "table_testDereferenceUidBug2", input2);
        pigServer.registerQuery("a = load 'table_testDereferenceUidBug1' as (a0:int, a1:int, a2:{t:(i0:int, i1:int)}, a3:int);");
        pigServer.registerQuery("b = foreach a generate a0, a1, a0+a1 as sum, a2.i0 as a2, a3;");
        pigServer.registerQuery("c = filter b by sum==0;");
        pigServer.registerQuery("d = load 'table_testDereferenceUidBug2' as (d0:int, d1:int);");
        pigServer.registerQuery("e = join c by a0, d by d0;");
        pigServer.registerQuery("f = foreach e generate c::a2;");

        Iterator<Tuple> iter = pigServer.openIterator("f");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("({(1)})"));
        Assert.assertFalse(iter.hasNext());
    }

    static public class UDFWithNonStandardType extends EvalFunc<Tuple>{
        @Override
        public Tuple exec(Tuple input) throws IOException {
            Tuple t = TupleFactory.getInstance().newTuple();
            t.append(new ArrayList<Integer>());
            return t;
        }
    }

    // See PIG-1826
    @Test
    public void testNonStandardData() throws Exception{
        String[] input1 = {
                "0",
        };

        Util.createInputFile(cluster, "table_testNonStandardData", input1);
        pigServer.registerQuery("a = load 'table_testNonStandardData' as (a0);");
        pigServer.registerQuery("b = foreach a generate " + UDFWithNonStandardType.class.getName() + "(a0);");

        try {
            pigServer.openIterator("b");
            Assert.fail();
        } catch (Exception e) {
            // Tez does not construct exceptions from stacktrace as it will have multiple ones.
            // So e.getCause().getCause() will be null
            Throwable cause = e.getCause().getCause() ==  null ? e.getCause() : e.getCause().getCause();
            String message = cause.getMessage();
            Assert.assertTrue(message.contains(ArrayList.class.getName()));
        }
    }

    // See PIG-1826
    @Test
    public void testNonStandardDataWithoutFetch() throws Exception{
        Properties props = pigServer.getPigContext().getProperties();
        props.setProperty(PigConfiguration.PIG_OPT_FETCH, "false");
        String[] input1 = {
                "0",
        };
        try {
            Util.createInputFile(cluster, "table_testNonStandardDataWithoutFetch", input1);
            pigServer.registerQuery("a = load 'table_testNonStandardDataWithoutFetch' as (a0);");
            pigServer.registerQuery("b = foreach a generate " + UDFWithNonStandardType.class.getName() + "(a0);");

            try {
                pigServer.openIterator("b");
                Assert.fail();
            } catch (Exception e) {
                Assert.assertTrue(ExceptionUtils.getRootCause(e).getMessage().contains(
                        "Unexpected data type " + ArrayList.class.getName() + " found in stream."));
            }
        }
        finally {
            props.setProperty(PigConfiguration.PIG_OPT_FETCH, "true");
        }
    }

    // See PIG-2078
    @Test
    public void testProjectNullBag() throws Exception{
        String[] input1 = {
                "{(1)}\t2",
                "\t3"
        };

        HashSet<String> optimizerRules = new HashSet<String>();
        optimizerRules.add("MergeForEach");
        pigServer.getPigContext().getProperties().setProperty(
                PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
                ObjectSerializer.serialize(optimizerRules));

        Util.createInputFile(cluster, "table_testProjectNullBag", input1);
        pigServer.registerQuery("a = load 'table_testProjectNullBag' as (a0:bag{}, a1:int);");
        pigServer.registerQuery("b = foreach a generate a0;");

        Iterator<Tuple> iter = pigServer.openIterator("b");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("({(1)})"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("()"));

        Assert.assertFalse(iter.hasNext());

        pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
    }

    // See PIG-2159
    @Test
    public void testUnionOnSchemaUidGeneration() throws Exception{
        String[] input1 = {
        		"100,101,102,103,104,105",
        		"110,111,112,113,114,115"
        };

        String[] input2 = {
        		"200,201,202,203,204,205",
        		"210,211,212,213,214,215"
        };

        String[] input0 = {
        		"200,201,202,203,204,205",
        		"210,211,212,213,214,215"
        };

        Util.createInputFile(cluster, "table_testUnionOnSchemaUidGeneration1", input1);
        Util.createInputFile(cluster, "table_testUnionOnSchemaUidGeneration2", input2);
        Util.createInputFile(cluster, "table_testUnionOnSchemaUidGeneration0", input0);

        pigServer.registerQuery("A = load 'table_testUnionOnSchemaUidGeneration1' using PigStorage(',')  as (f1:int,f2:int,f3:int,f4:long,f5:double);");
        pigServer.registerQuery("B = load 'table_testUnionOnSchemaUidGeneration2' using PigStorage(',')  as (f1:int,f2:int,f3:int,f4:long,f5:double);");
        pigServer.registerQuery("C = load 'table_testUnionOnSchemaUidGeneration0' using PigStorage(',')  as (f1:int,f2:int,f3:int);");
        pigServer.registerQuery("U = UNION ONSCHEMA A,B;");
        pigServer.registerQuery("J = join C by (f1,f2,f3) LEFT OUTER, U by (f1,f2,f3);");
        pigServer.registerQuery("Porj = foreach J generate C::f1 as f1 ,C::f2 as f2,C::f3 as f3,U::f4 as f4,U::f5 as f5;");
        pigServer.registerQuery("G = GROUP Porj by (f1,f2,f3,f5);");
        pigServer.registerQuery("Final = foreach G generate SUM(Porj.f4) as total;");

        Iterator<Tuple> iter = pigServer.openIterator("Final");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(203)"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(213)"));

        Assert.assertFalse(iter.hasNext());

    }

    // See PIG-2185
    @Test
    public void testProjectEmptyBag() throws Exception{
        String[] input = {
                "{(12)}",
                "{(23)}",
                ""
        };

        Util.createInputFile(cluster, "table_testProjectEmptyBag", input);

        pigServer.registerQuery("A = load 'table_testProjectEmptyBag' as (bg:bag{});");
        pigServer.registerQuery("B = FOREACH A { x = FILTER bg BY $0 == '12'; GENERATE x; };");

        Iterator<Tuple> iter = pigServer.openIterator("B");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("({(12)})"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("({})"));

        Assert.assertTrue(iter.hasNext());

        t = iter.next();
        Assert.assertTrue(t.toString().equals("({})"));

        Assert.assertFalse(iter.hasNext());
    }

    // See PIG-2231
    @Test
    public void testLimitFlatten() throws Exception{
        String[] input = {
                "1\tA",
                "1\tB",
                "2\tC",
                "3\tD",
                "3\tE",
                "3\tF"
        };

        Util.createInputFile(cluster, "table_testLimitFlatten", input);

        pigServer.registerQuery("data = load 'table_testLimitFlatten' as (k,v);");
        pigServer.registerQuery("grouped = GROUP data BY k;");
        if (Util.isSparkExecType(cluster.getExecType())) {
            pigServer.registerQuery("grouped = ORDER grouped BY group;");
        }
        pigServer.registerQuery("selected = LIMIT grouped 2;");
        pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN (data);");

        Iterator<Tuple> iter = pigServer.openIterator("flattened");

        String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"};

        Util.checkQueryOutputsAfterSortRecursive(iter, expected,
            org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
    }

    // See PIG-2237
    @Test
    public void testLimitAutoReducer() throws Exception{
        String[] input = {
                "1\tA",
                "4\tB",
                "2\tC",
                "3\tD",
                "6\tE",
                "5\tF"
        };

        Util.createInputFile(cluster, "table_testLimitAutoReducer", input);

        pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "16");
        pigServer.registerQuery("A = load 'table_testLimitAutoReducer';");
        pigServer.registerQuery("B = order A by $0;");
        pigServer.registerQuery("C = limit B 2;");

        Iterator<Tuple> iter = pigServer.openIterator("C");

        Tuple t = iter.next();
        Assert.assertTrue(t.toString().equals("(1,A)"));

        t = iter.next();
        Assert.assertTrue(t.toString().equals("(2,C)"));

        Assert.assertFalse(iter.hasNext());
    }

    @SuppressWarnings("unchecked")
    @Test
    public void testCrossAfterGroupAll() throws Exception{
        String[] input = {
                "1\tA",
                "2\tB",
                "3\tC",
                "4\tD",
        };

        Util.createInputFile(cluster, "table_testCrossAfterGroupAll", input);

        try {
            pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "40");
            pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0:int, a1:chararray);");
            pigServer.registerQuery("B = group A all;");
            pigServer.registerQuery("C = foreach B generate COUNT(A);");
            pigServer.registerQuery("D = cross A, C;");
            Path output = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
            ExecJob job = pigServer.store("D", output.toString());
            FileSystem fs = output.getFileSystem(cluster.getConfiguration());
            FileStatus[] partFiles = fs.listStatus(output, new PathFilter() {
                @Override
                public boolean accept(Path path) {
                    if (path.getName().startsWith("part")) {
                        return true;
                    }
                    return false;
                }
            });

            if (Util.isSparkExecType(cluster.getExecType())) {
                // TODO: Fix this when we implement auto-parallelism in Spark
                Assert.assertTrue(partFiles.length == 1);
            } else {
                // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
                Assert.assertTrue(partFiles.length >= 2);
            }
            // Check the output
            Iterator<Tuple> iter = job.getResults();
            List<Tuple> results = new ArrayList<Tuple>();
            while (iter.hasNext()) {
                results.add(iter.next());
            }
            Collections.sort(results);
            Assert.assertEquals(4, results.size());
            Assert.assertEquals("(1,A,4)", results.get(0).toString());
            Assert.assertEquals("(2,B,4)", results.get(1).toString());
            Assert.assertEquals("(3,C,4)", results.get(2).toString());
            Assert.assertEquals("(4,D,4)", results.get(3).toString());
        } finally {
            pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
        }
    }

    // see PIG-4392
    @Test
    public void testRankWithEmptyReduce() throws Exception {
        Util.createInputFile(cluster, "table_testRankWithEmptyReduce", new String[]{"1\t2\t3", "4\t5\t6", "7\t8\t9"});
        pigServer.setDefaultParallel(4);

        pigServer.registerQuery("d = load 'table_testRankWithEmptyReduce' as (a:int, b:int, c:int);");
        pigServer.registerQuery("e = rank d by a parallel 4;");

        Iterator<Tuple> iter = pigServer.openIterator("e");

        Collection<String> results = new HashSet<String>();
        results.add("(1,1,2,3)");
        results.add("(2,4,5,6)");
        results.add("(3,7,8,9)");

        Assert.assertTrue(results.contains(iter.next().toString()));
        Assert.assertTrue(results.contains(iter.next().toString()));
        Assert.assertTrue(results.contains(iter.next().toString()));
        Assert.assertFalse(iter.hasNext());
    }
}
