/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.StringTokenizer;

import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.TextLoader;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.PigFile;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.Identity;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestEvalPipeline {
    private static PigServer pigServer;
    private static PigContext pigContext;
    private static Properties properties;
    private static MiniGenericCluster cluster;

    private TupleFactory mTf = TupleFactory.getInstance();
    private BagFactory mBf = BagFactory.getInstance();

    @Before
    public void setUp() throws Exception{
        pigServer = new PigServer(cluster.getExecType(), properties);
        pigContext = pigServer.getPigContext();
    }

    @BeforeClass
    public static void oneTimeSetup() {
        cluster = MiniGenericCluster.buildCluster();
        properties = cluster.getProperties();
    }

    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        cluster.shutDown();
    }

    static public class MyBagFunction extends EvalFunc<DataBag>{
        @Override
        public DataBag exec(Tuple input) throws IOException {
            TupleFactory tf = TupleFactory.getInstance();
            DataBag output = BagFactory.getInstance().newDefaultBag();
            output.add(tf.newTuple("a"));
            output.add(tf.newTuple("a"));
            output.add(tf.newTuple("a"));
            return output;
        }
    }

    @Test
    public void testFunctionInsideFunction() throws Exception{
        File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});

        pigServer.registerQuery("a = load '"
                + Util.generateURI(f1.toString(), pigContext)
                + "' using " + PigStorage.class.getName() + "(':');");
        pigServer.registerQuery("b = foreach a generate 1-1/1;");
        Iterator<Tuple> iter  = pigServer.openIterator("b");

        for (int i=0 ;i<3; i++){
            Assert.assertEquals(0.0d, DataType.toDouble(iter.next().get(0)).doubleValue(), 0.0d);
        }
    }

    @Test
    public void testJoin() throws Exception{
        File f1 = Util.createFile(new String[]{"a:1","b:1","a:1"});
        File f2 = Util.createFile(new String[]{"b","b","a"});

        pigServer.registerQuery("a = load '"
                + Util.generateURI(f1.toString(), pigContext) + "' using "
                + PigStorage.class.getName() + "(':');");
        pigServer.registerQuery("b = load '"
                + Util.generateURI(f2.toString(), pigContext) + "';");
        pigServer.registerQuery("c = cogroup a by $0, b by $0;");
        pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);");

        Iterator<Tuple> iter = pigServer.openIterator("d");
        int count = 0;
        while(iter.hasNext()){
            Tuple t = iter.next();
            Assert.assertTrue(t.get(0).toString().equals(t.get(2).toString()));
            count++;
        }
        Assert.assertEquals(count, 4);
    }

    @Test
    public void testDriverMethod() throws Exception{
        File f = Util.createTempFileDelOnExit("tmp", "");
        PrintWriter pw = new PrintWriter(f);
        pw.println("a");
        pw.println("a");
        pw.close();
        pigServer.registerQuery("a = foreach (load '"
                + Util.generateURI(f.toString(), pigContext) + "') "
                + "generate 1, flatten(" + MyBagFunction.class.getName() + "(*));");
        Iterator<Tuple> iter = pigServer.openIterator("a");
        int count = 0;
        while(iter.hasNext()){
            Tuple t = iter.next();
            Assert.assertTrue(t.get(0).toString().equals("1"));
            Assert.assertTrue(t.get(1).toString().equals("a"));
            count++;
        }
        Assert.assertEquals(count, 6);
        f.delete();
    }

    @Test
    public void testMapLookup() throws Exception {
        DataBag b = BagFactory.getInstance().newDefaultBag();
        Map<String, Object> colors = new HashMap<String, Object>();
        colors.put("apple","red");
        colors.put("orange","orange");

        Map<String, Object> weights = new HashMap<String, Object>();
        weights.put("apple","0.1");
        weights.put("orange","0.3");

        Tuple t = mTf.newTuple();
        t.append(colors);
        t.append(weights);
        b.add(t);

        File tmpFile = File.createTempFile("tmp", "");
        tmpFile.delete(); // we only needed the temp file name, so delete the file
        String fileName = Util.removeColon(tmpFile.getAbsolutePath());

        PigFile f = new PigFile(fileName);
        f.store(b, new FuncSpec(BinStorage.class.getCanonicalName()),
                pigServer.getPigContext());

        pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName) + "' using BinStorage();");
        pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
        Iterator<Tuple> iter = pigServer.openIterator("b");
        t = iter.next();
        Assert.assertEquals(t.get(0).toString(), "red");
        Assert.assertEquals(0.3d, DataType.toDouble(t.get(1)).doubleValue(), 0.0d);
        Assert.assertFalse(iter.hasNext());
        Util.deleteFile(cluster, fileName);
    }

    static public class TitleNGrams extends EvalFunc<DataBag> {

        @Override
        public DataBag exec(Tuple input) throws IOException {
            try {
                DataBag output = BagFactory.getInstance().newDefaultBag();
                String str = input.get(0).toString();

                String title = str;

                if (title != null) {
                    List<String> nGrams = makeNGrams(title);

                    for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) {
                        Tuple t = TupleFactory.getInstance().newTuple(1);
                        t.set(0, it.next());
                        output.add(t);
                    }
                }

                return output;
            } catch (ExecException ee) {
                IOException ioe = new IOException(ee.getMessage());
                ioe.initCause(ee);
                throw ioe;
            }
        }


        List<String> makeNGrams(String str) {
            List<String> tokens = new ArrayList<String>();

            StringTokenizer st = new StringTokenizer(str);
            while (st.hasMoreTokens())
                tokens.add(st.nextToken());

            return nGramHelper(tokens, new ArrayList<String>());
        }

        ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) {
            if (str.size() == 0)
                return nGrams;

            for (int i = 0; i < str.size(); i++)
                nGrams.add(makeString(str.subList(0, i+1)));

            return nGramHelper(str.subList(1, str.size()), nGrams);
        }

        String makeString(List<String> list) {
            StringBuffer sb = new StringBuffer();
            for (Iterator<String> it = list.iterator(); it.hasNext(); ) {
                sb.append(it.next());
                if (it.hasNext())
                    sb.append(" ");
            }
            return sb.toString();
        }

        @Override
        public Schema outputSchema(Schema input) {
            try {
            Schema stringSchema = new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
            Schema.FieldSchema fs = new Schema.FieldSchema(null, stringSchema, DataType.BAG);
            return new Schema(fs);
            } catch (Exception e) {
                return null;
            }
        }
    }

    static public class MapUDF extends EvalFunc<Map<String, Object>> {
        @Override
        public Map<String, Object> exec(Tuple input) throws IOException {

            super.reporter.progress();

            TupleFactory tupleFactory = TupleFactory.getInstance();
            ArrayList<Object> objList = new ArrayList<Object>();
            objList.add(new Integer(1));
            objList.add(new Double(1.0));
            objList.add(new Float(1.0));
            objList.add(new String("World!"));
            Tuple tuple = tupleFactory.newTuple(objList);

            BagFactory bagFactory = BagFactory.getInstance();
            DataBag bag = bagFactory.newDefaultBag();
            bag.add(tuple);

            Map<String, Object> mapInMap = new HashMap<String, Object>();
            mapInMap.put("int", new Integer(10));
            mapInMap.put("float", new Float(10.0));

            Map<String, Object> myMap = new HashMap<String, Object>();
            myMap.put("string", new String("Hello"));
            myMap.put("int", new Integer(1));
            myMap.put("long", new Long(1));
            myMap.put("float", new Float(1.0));
            myMap.put("double", new Double(1.0));
            myMap.put("dba", new DataByteArray(new String("1234").getBytes()));
            myMap.put("map", mapInMap);
            myMap.put("tuple", tuple);
            myMap.put("bag", bag);
            return myMap;
        }

        @Override
        public Schema outputSchema(Schema input) {
            return new Schema(new Schema.FieldSchema(null, DataType.MAP));
        }
    }

    @Test
    public void testBagFunctionWithFlattening() throws Exception{
        File queryLogFile = Util.createFile(
                    new String[]{
                        "stanford\tdeer\tsighting",
                        "bush\tpresident",
                        "stanford\tbush",
                        "conference\tyahoo",
                        "world\tcup\tcricket",
                        "bush\twins",
                        "stanford\tpresident",
                    }
                );

        File newsFile = Util.createFile(
                    new String[]{
                        "deer seen at stanford",
                        "george bush visits stanford",
                        "yahoo hosting a conference in the bay area",
                        "who will win the world cup"
                    }
                );

        Map<String, Integer> expectedResults = new HashMap<String, Integer>();
        expectedResults.put("bush", 2);
        expectedResults.put("stanford", 3);
        expectedResults.put("world", 1);
        expectedResults.put("conference", 1);

        pigServer.registerQuery("newsArticles = LOAD '"
                + Util.generateURI(newsFile.toString(), pigContext)
                + "' USING " + TextLoader.class.getName() + "();");
        pigServer.registerQuery("queryLog = LOAD '"
                + Util.generateURI(queryLogFile.toString(), pigContext) + "';");

        pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
        pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;");
        pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");

        Iterator<Tuple> iter = pigServer.openIterator("answer");
        if(!iter.hasNext()) Assert.fail("No Output received");
        while(iter.hasNext()){
            Tuple t = iter.next();
            Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),
                    (DataType.toDouble(t.get(0))).doubleValue(), 0.0d);
        }
    }

    @Test
    public void testSort() throws Exception{
        testSortDistinct(false);
    }

    @Test
    public void testDistinct() throws Exception{
        testSortDistinct(true);
    }

    private void testSortDistinct(boolean eliminateDuplicates) throws Exception{
        int LOOP_SIZE = 1024*16;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        Random r = new Random();
        for(int i = 0; i < LOOP_SIZE; i++) {
            ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        if (eliminateDuplicates){
            pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
        }else{
            pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
        }
        Iterator<Tuple> iter = pigServer.openIterator("B");
        String last = "";
        HashSet<Integer> seen = new HashSet<Integer>();
        if(!iter.hasNext()) Assert.fail("No Results obtained");
        while (iter.hasNext()){
            Tuple t = iter.next();
            if (eliminateDuplicates){
                Integer act = Integer.parseInt(t.get(0).toString());
                Assert.assertFalse(seen.contains(act));
                seen.add(act);
            }else{
                Assert.assertTrue(last.compareTo(t.get(0).toString())<=0);
                Assert.assertEquals(t.size(), 2);
                last = t.get(0).toString();
            }
        }
    }

    @Test
    public void testNestedPlan() throws Exception{
        int LOOP_COUNT = 10;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = group A by $0;");
        String query = "C = foreach B {"
        + "C1 = filter A by $0 > -1;"
        + "C2 = distinct C1;"
        + "C3 = distinct A;"
        + "generate (int)group," + Identity.class.getName() +"(*), COUNT(C2), SUM(C2.$1)," +  TitleNGrams.class.getName() + "(C3), MAX(C3.$1), C2;"
        + "};";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        List<Tuple> actualResList = new ArrayList<Tuple>();
        while(iter.hasNext()){
            actualResList.add(iter.next());
        }

        Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));

        int numIdentity = 0;
        for (Tuple t : actualResList) {
            Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
            Assert.assertEquals((Long)5L, (Long)t.get(2));
            Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
            Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
            Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
            Assert.assertEquals(7, t.size());
            ++numIdentity;
        }
        Assert.assertEquals(LOOP_COUNT, numIdentity);
    }

    @Test
    public void testNestedPlanWithExpressionAssignment() throws Exception{
        int LOOP_COUNT = 10;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = group A by $0;");
        String query = "C = foreach B {"
        + "C1 = filter A by $0 > -1;"
        + "C2 = distinct C1;"
        + "C3 = distinct A;"
        + "C4 = " + Identity.class.getName() + "(*);"
        + "C5 = COUNT(C2);"
        + "C6 = SUM(C2.$1);"
        + "C7 = " + TitleNGrams.class.getName() + "(C3);"
        + "C8 = MAX(C3.$1);"
        + "generate (int)group, C4, C5, C6, C7, C8, C2;"
        + "};";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        List<Tuple> actualResList = new ArrayList<Tuple>();
        while(iter.hasNext()){
            actualResList.add(iter.next());
        }

        Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));

        int numIdentity = 0;
        for (Tuple t : actualResList) {
            Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
            Assert.assertEquals((Long)5L, (Long)t.get(2));
            Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
            Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
            Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
            Assert.assertEquals(7, t.size());
            ++numIdentity;
        }
        Assert.assertEquals(LOOP_COUNT, numIdentity);
    }

    @Test
    public void testLimit() throws Exception{
        int LOOP_COUNT = 20;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            ps.println(i);
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = limit A 5;");
        Iterator<Tuple> iter = pigServer.openIterator("B");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numIdentity = 0;
        while(iter.hasNext()){
            iter.next();
            ++numIdentity;
        }
        Assert.assertEquals(5, numIdentity);
    }

    @Test
    public void testComplexData() throws IOException, ExecException {
        // Create input file with ascii data
        File input = Util.createInputFile("tmp", "",
                new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});

        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
        pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
        Iterator<Tuple> it = pigServer.openIterator("b");
        Tuple t = it.next();
        Assert.assertEquals(new Long(2), t.get(0));
        Assert.assertEquals("1", t.get(1).toString());
        Assert.assertEquals("2", t.get(2).toString());
        Assert.assertEquals("value1", t.get(3).toString());
        Assert.assertEquals("value2", t.get(4).toString());

        //test with BinStorage
        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
        String output = "/pig/out/TestEvalPipeline-testComplexData";
        pigServer.deleteFile(output);
        pigServer.store("a", output, BinStorage.class.getName());
        pigServer.registerQuery("x = load '" + output +"' using BinStorage() " +
                "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
        pigServer.registerQuery("y = foreach x generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
        it = pigServer.openIterator("y");
        t = it.next();
        Assert.assertEquals(new Long(2), t.get(0));
        Assert.assertEquals("1", t.get(1).toString());
        Assert.assertEquals("2", t.get(2).toString());
        Assert.assertEquals("value1", t.get(3).toString());
        Assert.assertEquals("value2", t.get(4).toString());
    }

    @Test
    public void testBinStorageDetermineSchema() throws IOException, ExecException {
        // Create input file with ascii data
        File input = Util.createInputFile("tmp", "",
                new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});

        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                + "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
        pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
        Iterator<Tuple> it = pigServer.openIterator("b");
        Tuple t = it.next();
        Assert.assertEquals(new Long(2), t.get(0));
        Assert.assertEquals(1, t.get(1));
        Assert.assertEquals(2, t.get(2));
        Assert.assertEquals("value1", t.get(3).toString());
        Assert.assertEquals("value2", t.get(4).toString());

        //test with BinStorage
        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                + "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
        String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema";
        pigServer.deleteFile(output);
        pigServer.store("a", output, BinStorage.class.getName());
        // test with different load specifications
        String[] loads = {"p = load '" + output +"' using BinStorage() " +
                "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);",
                "p = load '" + output +"' using BinStorage() " +
                "as (b, t2, m);",
                "p = load '" + output +"' using BinStorage() ;"};
        // the corresponding generate statements
        String[] generates = {"q = foreach p generate COUNT(b), t2.a, t2.b as t2b, m#'key1', m#'key2', b;",
                "q = foreach p generate COUNT(b), t2.$0, t2.$1, m#'key1', m#'key2', b;",
                "q = foreach p generate COUNT($0), $1.$0, $1.$1, $2#'key1', $2#'key2', $0;"};

        for (int i = 0; i < loads.length; i++) {
            pigServer.registerQuery(loads[i]);
            pigServer.registerQuery(generates[i]);
            it = pigServer.openIterator("q");
            t = it.next();
            Assert.assertEquals(new Long(2), t.get(0));
            Assert.assertEquals(Integer.class, t.get(1).getClass());
            Assert.assertEquals(1, t.get(1));
            Assert.assertEquals(Integer.class, t.get(2).getClass());
            Assert.assertEquals(2, t.get(2));
            Assert.assertEquals("value1", t.get(3).toString());
            Assert.assertEquals("value2", t.get(4).toString());
            Assert.assertEquals(DefaultDataBag.class, t.get(5).getClass());
            DataBag bg = (DataBag)t.get(5);
            for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
                Tuple bt = bit.next();
                Assert.assertEquals(String.class, bt.get(0).getClass());
                Assert.assertEquals(String.class, bt.get(1).getClass());
            }
        }
    }

    @Test
    public void testProjectBag() throws IOException, ExecException {
        // This tests make sure that when a bag with multiple columns is
        // projected all columns apear in the output
        File input = Util.createInputFile("tmp", "",
                new String[] {"f1\tf2\tf3"});
        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext) + "' as (x, y, z);");
        pigServer.registerQuery("b = group a by x;");
        pigServer.registerQuery("c = foreach b generate flatten(a.(y, z));");
        Iterator<Tuple> it = pigServer.openIterator("c");
        Tuple t = it.next();
        Assert.assertEquals(2, t.size());
        Assert.assertEquals("f2", t.get(0).toString());
        Assert.assertEquals("f3", t.get(1).toString());
    }

    @Test
    public void testBinStorageDetermineSchema2() throws IOException, ExecException {
        // Create input file with ascii data
        File input = Util.createInputFile("tmp", "",
                new String[] {"pigtester\t10\t1.2"});

        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                + "as (name:chararray, age:int, gpa:double);");
        String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema2";
        pigServer.deleteFile(output);
        pigServer.store("a", output, BinStorage.class.getName());
        // test with different load specifications
        String[] loads = {"p = load '" + output +"' using BinStorage() " +
                "as (name:chararray, age:int, gpa:double);",
                "p = load '" + output +"' using BinStorage() " +
                "as (name, age, gpa);",
                "p = load '" + output +"' using BinStorage() ;"};
        // the corresponding generate statements
        String[] generates = {"q = foreach p generate name, age, gpa;",
                "q = foreach p generate name, age, gpa;",
                "q = foreach p generate $0, $1, $2;"};

        for (int i = 0; i < loads.length; i++) {
            pigServer.registerQuery(loads[i]);
            pigServer.registerQuery(generates[i]);
            Iterator<Tuple> it = pigServer.openIterator("q");
            Tuple t = it.next();
            Assert.assertEquals("pigtester", t.get(0));
            Assert.assertEquals(String.class, t.get(0).getClass());
            Assert.assertEquals(10, t.get(1));
            Assert.assertEquals(Integer.class, t.get(1).getClass());
            Assert.assertEquals(1.2, t.get(2));
            Assert.assertEquals(Double.class, t.get(2).getClass());
        }

        // test that valid casting is allowed
        pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
                " as (name, age:long, gpa:float);");
        pigServer.registerQuery("q = foreach p generate name, age, gpa;");
        Iterator<Tuple> it = pigServer.openIterator("q");
        Tuple t = it.next();
        Assert.assertEquals("pigtester", t.get(0));
        Assert.assertEquals(String.class, t.get(0).getClass());
        Assert.assertEquals(10L, t.get(1));
        Assert.assertEquals(Long.class, t.get(1).getClass());
        Assert.assertEquals(1.2f, t.get(2));
        Assert.assertEquals(Float.class, t.get(2).getClass());

        // test that implicit casts work
        pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
        " as (name, age, gpa);");
        pigServer.registerQuery("q = foreach p generate name, age + 1L, (int)gpa;");
        it = pigServer.openIterator("q");
        t = it.next();
        Assert.assertEquals("pigtester", t.get(0));
        Assert.assertEquals(String.class, t.get(0).getClass());
        Assert.assertEquals(11L, t.get(1));
        Assert.assertEquals(Long.class, t.get(1).getClass());
        Assert.assertEquals(1, t.get(2));
        Assert.assertEquals(Integer.class, t.get(2).getClass());
    }

    @Test
    public void testCogroupWithInputFromGroup() throws IOException, ExecException {
        // Create input file with ascii data
        File input = Util.createInputFile("tmp", "",
                new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2",
                "pigtester2\t10\t1.2",
                "pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"});

        Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>();
        // we will in essence be doing a group on first column and getting
        // SUM over second column and a count for the group - store
        // the results for the three groups above so we can check the output
        resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L));
        resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L));
        resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L));

        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                + "as (name:chararray, age:int, gpa:double);");
        pigServer.registerQuery("b = group a by name;");
        pigServer.registerQuery("c = load '"
                + Util.generateURI(input.toString(), pigContext) + "' using PigStorage() "
                + "as (name:chararray, age:int, gpa:double);");
        pigServer.registerQuery("d = cogroup b by group, c by name;");
        pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);");
        Iterator<Tuple> it = pigServer.openIterator("e");
        for(int i = 0; i < resultMap.size(); i++) {
            Tuple t = it.next();
            Assert.assertEquals(true, resultMap.containsKey(t.get(0)));
            Pair<Long, Long> output = resultMap.get(t.get(0));
            Assert.assertEquals(output.first, t.get(1));
            Assert.assertEquals(output.second, t.get(2));
        }
    }

    @Test
    public void testUtf8Dump() throws IOException, ExecException {

        // Create input file with unicode data
        File input = Util.createInputFile("tmp", "",
                new String[] {"wendyξ"});
        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext)
                + "' using PigStorage() " + "as (name:chararray);");
        Iterator<Tuple> it = pigServer.openIterator("a");
        Tuple t = it.next();
        Assert.assertEquals("wendyξ", t.get(0));

    }

    @SuppressWarnings("unchecked")
    @Test
    public void testMapUDF() throws Exception{
        int LOOP_COUNT = 2;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
        String query = "C = foreach B {"
        + "generate (double)mymap#'double' as d, (long)mymap#'long' + (float)mymap#'float' as float_sum, CONCAT((chararray) mymap#'string', ' World!'), mymap#'int' * 10, (bag{tuple()}) mymap#'bag' as mybag, (tuple()) mymap#'tuple' as mytuple, (map[])mymap#'map' as mapInMap, mymap#'dba' as dba;"
        + "};";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numIdentity = 0;
        while(iter.hasNext()){
            Tuple t = iter.next();
            Assert.assertEquals(1.0, (Double)t.get(0), 0.01);
            Assert.assertEquals(2.0, (Float)t.get(1), 0.01);
            Assert.assertTrue(((String)t.get(2)).equals("Hello World!"));
            Assert.assertEquals(new Integer(10), (Integer)t.get(3));
            Assert.assertEquals(1, ((DataBag)t.get(4)).size());
            Assert.assertEquals(4, ((Tuple)t.get(5)).size());
            Assert.assertEquals(2, ((Map<String, Object>)t.get(6)).size());
            Assert.assertEquals(DataByteArray.class, t.get(7).getClass());
            Assert.assertEquals(8, t.size());
            ++numIdentity;
        }
        Assert.assertEquals(LOOP_COUNT * LOOP_COUNT, numIdentity);
    }

    @Test
    public void testMapUDFWithImplicitTypeCast() throws Exception{
        int LOOP_COUNT = 2;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            ps.println(i);
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
        String query = "C = foreach B generate mymap#'dba' * 10; ";

        pigServer.registerQuery(query);

        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numIdentity = 0;
        while(iter.hasNext()){
            Tuple t = iter.next();
            Assert.assertEquals(new Integer(12340), (Integer)t.get(0));
            ++numIdentity;
        }
        Assert.assertEquals(LOOP_COUNT, numIdentity);
    }

    @Test
    public void testLoadCtorArgs() throws IOException, ExecException {

        // Create input file
        File input = Util.createInputFile("tmp", "",
                new String[] {"hello:world"});
        pigServer.registerQuery("a = load '"
                + Util.generateURI(input.toString(), pigContext)
                + "' using org.apache.pig.test.PigStorageNoDefCtor(':');");
        pigServer.registerQuery("b = foreach a generate (chararray)$0, (chararray)$1;");
        Iterator<Tuple> it = pigServer.openIterator("b");
        Tuple t = it.next();
        Assert.assertEquals("hello", t.get(0));
        Assert.assertEquals("world", t.get(1));

    }

    @Test
    public void testNestedPlanForCloning() throws Exception{
        int LOOP_COUNT = 10;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = group A by $0;");
        String query = "C = foreach B {"
        + "C1 = filter A by not($0 <= -1);"
        + "C2 = distinct C1;"
        + "C3 = distinct A;"
        + "C4 = order A by $0;"
        + "generate (group + 1) * 10, COUNT(C4), COUNT(C2), SUM(C2.$1)," +  TitleNGrams.class.getName() + "(C3), MAX(C3.$1), C2;"
        + "};";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        List<Tuple> actualResList = new ArrayList<Tuple>();
        while(iter.hasNext()){
            actualResList.add(iter.next());
        }

        Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));

        int numIdentity = 0;
        for (Tuple t : actualResList) {
            Assert.assertEquals((Integer)((numIdentity + 1) * 10), (Integer)t.get(0));
            Assert.assertEquals((Long)10L, (Long)t.get(1));
            Assert.assertEquals((Long)5L, (Long)t.get(2));
            Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
            Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
            Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
            Assert.assertEquals(7, t.size());
            ++numIdentity;
        }
        Assert.assertEquals(LOOP_COUNT, numIdentity);
    }

    @Test
    public void testArithmeticCloning() throws Exception{
        int LOOP_COUNT = 10;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = distinct A;");
        if (Util.isSparkExecType(cluster.getExecType())) {
            pigServer.registerQuery("B = order B by *;");
        }

        String query = "C = foreach B {"
        + "C1 = $1 - $0;"
        + "C2 = $1%2;"
        + "C3 = ($1 == 0? 0 : $0/$1);"
        + "generate C1, C2, C3;"
        + "};";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numRows = 0;
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j = 0; j < LOOP_COUNT; j+=2){
                Tuple t = null;
                if(iter.hasNext()) t = iter.next();
                Assert.assertEquals(3, t.size());
                Assert.assertEquals(new Double(j - i), (Double)t.get(0), 0.01);
                Assert.assertEquals((Integer)(j%2), (Integer)t.get(1));
                if(j == 0) {
                    Assert.assertEquals(0.0, (Double)t.get(2), 0.01);
                } else {
                    Assert.assertEquals((Double)((double)i/j), (Double)t.get(2), 0.01);
                }
                ++numRows;
            }
        }

        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
    }

    @Test
    public void testExpressionReUse() throws Exception{
        int LOOP_COUNT = 10;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = distinct A;");
        if (Util.isSparkExecType(cluster.getExecType())) {
            pigServer.registerQuery("B = order B by *;");
        }

        String query = "C = foreach B {"
        + "C1 = $0 + $1;"
        + "C2 = C1 + $0;"
        + "generate C1, C2;"
        + "};";

        pigServer.registerQuery(query);
        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numRows = 0;
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j = 0; j < LOOP_COUNT; j+=2){
                Tuple t = null;
                if(iter.hasNext()) t = iter.next();
                Assert.assertEquals(2, t.size());
                Assert.assertEquals(new Double(i + j), (Double)t.get(0), 0.01);
                Assert.assertEquals(new Double(i + j + i), (Double)t.get(1));
                ++numRows;
            }
        }

        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
    }

    @Test
    public void testIdentity() throws Exception{
        int LOOP_COUNT = 2;
        File tmpFile = Util.createTempFileDelOnExit("test", "txt");
        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j=0;j<LOOP_COUNT;j+=2){
                ps.println(i+"\t"+j);
                ps.println(i+"\t"+j);
            }
        }
        ps.close();

        pigServer.registerQuery("A = LOAD '"
                + Util.generateURI(tmpFile.toString(), pigContext) + "';");
        pigServer.registerQuery("B = distinct A ;"); //the argument does not matter
        pigServer.registerQuery("C = foreach B generate FLATTEN(" + Identity.class.getName() + "($0, $1));"); //the argument does not matter

        Iterator<Tuple> iter = pigServer.openIterator("C");
        if(!iter.hasNext()) Assert.fail("No output found");
        int numRows = 0;
        for(int i = 0; i < LOOP_COUNT; i++) {
            for(int j = 0; j < LOOP_COUNT; j+=2){
                Tuple t = null;
                if(iter.hasNext()) t = iter.next();
                Assert.assertEquals(2, t.size());
                Assert.assertEquals(new Double(i), new Double(t.get(0).toString()), 0.01);
                Assert.assertEquals(new Double(j), new Double(t.get(1).toString()), 0.01);
                ++numRows;
            }
        }

        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
    }

    @Test
    public void testCogroupAfterDistinct() throws Exception {
        String[] input1 = {
                "abc",
                "abc",
                "def",
                "def",
                "def",
                "abc",
                "def",
                "ghi"
                };
        String[] input2 = {
            "ghi	4",
            "rst	12344",
            "uvw	1",
            "xyz	4141"
            };
        Util.createInputFile(cluster, "table1", input1);
        Util.createInputFile(cluster, "table2", input2);

        pigServer.registerQuery("nonuniqtable1 = LOAD 'table1' AS (f1:chararray);");
        pigServer.registerQuery("table1 = DISTINCT nonuniqtable1;");
        pigServer.registerQuery("table2 = LOAD 'table2' AS (f1:chararray, f2:int);");
        pigServer.registerQuery("temp = COGROUP table1 BY f1 INNER, table2 BY f1;");
        Iterator<Tuple> it = pigServer.openIterator("temp");

        // results should be:
        // (abc,{(abc)},{})
        // (def,{(def)},{})
        // (ghi,{(ghi)},{(ghi,4)})
        HashMap<String, Tuple> results = new HashMap<String, Tuple>();
        Object[] row = new Object[] { "abc",
                Util.createBagOfOneColumn(new String[] { "abc"}), mBf.newDefaultBag() };
        results.put("abc", Util.createTuple(row));
        row = new Object[] { "def",
                Util.createBagOfOneColumn(new String[] { "def"}), mBf.newDefaultBag() };
        results.put("def", Util.createTuple(row));
        Object[] thirdColContents = new Object[] { "ghi", 4 };
        Tuple t = Util.createTuple(thirdColContents);
        row = new Object[] { "ghi",
                Util.createBagOfOneColumn(new String[] { "ghi"}), Util.createBag(new Tuple[] { t })};
        results.put("ghi", Util.createTuple(row));

        while(it.hasNext()) {
            Tuple tup = it.next();
            List<Object> fields = tup.getAll();
            Tuple expected = results.get((String)fields.get(0));
            int i = 0;
            for (Object field : fields) {
                Assert.assertEquals(expected.get(i++), field);
            }
        }

        Util.deleteFile(cluster, "table1");
        Util.deleteFile(cluster, "table2");
    }

    @Test
    public void testAlgebraicDistinctProgress() throws Exception {

        //creating a test input of larger than 1000 to make
        //sure that progress kicks in. The only way to test this
        //is to add a log statement to the getDistinct
        //method in Distinct.java. There is no automated mechanism
        //to check this from pig
        int inputSize = 4004;
        Integer[] inp = new Integer[inputSize];
        String[] inpString = new String[inputSize];
        for(int i = 0; i < inputSize; i+=2) {
            inp[i] = i/2;
            inp[i+1] = i/2;
            inpString[i] = new Integer(i/2).toString();
            inpString[i+1] = new Integer(i/2).toString();
        }

        Util.createInputFile(cluster, "table", inpString);
        StringWriter writer = new StringWriter();
        if (cluster.getExecType().name().equals("TEZ")) {
            Util.createLogAppender("testNoCombinerInReducer", writer, Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
        }

        pigServer.registerQuery("a = LOAD 'table' AS (i:int);");
        pigServer.registerQuery("b = group a ALL;");
        pigServer.registerQuery("c = foreach b {aa = DISTINCT a; generate COUNT(aa);};");
        Iterator<Tuple> it = pigServer.openIterator("c");

        Integer[] exp = new Integer[inputSize/2];
        for(int j = 0; j < inputSize/2; ++j) {
            exp[j] = j;
        }

        DataBag expectedBag = Util.createBagOfOneColumn(exp);

        while(it.hasNext()) {
            Tuple tup = it.next();
            Long resultBagSize = (Long)tup.get(0);
            Assert.assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
        }

        if (cluster.getExecType().name().equals("TEZ")) {
            Assert.assertTrue(writer.toString().contains("Turning off combiner in reducer"));
            Util.removeLogAppender("testNoCombinerInReducer", Class.forName("org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder"));
        }
        Util.deleteFile(cluster, "table");
    }

    @Test
    public void testBinStorageWithLargeStrings() throws Exception {
        // Create input file with large strings
        int testSize = 100;
        String[] stringArray = new String[testSize];
        Random random = new Random();
        stringArray[0] = GenRandomData.genRandLargeString(random, 65534);
        for(int i = 1; i < stringArray.length; ++i) {
            //generate a few large strings every 25th record
            if((i % 25) == 0) {
                stringArray[i] = GenRandomData.genRandLargeString(random, 65535 + i);
            } else {
                stringArray[i] = GenRandomData.genRandString(random);
            }
        }

        Util.createInputFile(cluster, "table", stringArray);

        //test with BinStorage
        pigServer.registerQuery("a = load 'table' using PigStorage() " +
                "as (c: chararray);");
        String output = "/pig/out/TestEvalPipeline-testBinStorageLargeStrings";
        pigServer.deleteFile(output);
        pigServer.store("a", output, BinStorage.class.getName());

        pigServer.registerQuery("b = load '" + output +"' using BinStorage() " +
                "as (c:chararray);");
        pigServer.registerQuery("c = foreach b generate c;");

        Iterator<Tuple> it = pigServer.openIterator("c");
        int counter = 0;
        while(it.hasNext()) {
            Tuple tup = it.next();
            String resultString = (String)tup.get(0);
            String expectedString = stringArray[counter];
            Assert.assertTrue(expectedString.equals(resultString));
            ++counter;
        }
        Util.deleteFile(cluster, "table");
    }

}
