/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.junit.Before;
import org.junit.Test;

public class TestScalarAliasesLocal {
    private static final String BUILD_TEST_TMP = "build/test/tmp/";
    private PigServer pigServer;

    TupleFactory mTf = TupleFactory.getInstance();
    BagFactory mBf = BagFactory.getInstance();

    @Before
    public void setUp() throws Exception{
        pigServer = new PigServer(Util.getLocalTestMode());
    }

    public static void deleteDirectory(File file) {
        if (file.exists()) {
            Util.deleteDirectory(file);
        }
    }

    public static File createLocalInputFile(String filename, String[] inputData)
            throws IOException {
        new File(filename).getParentFile().mkdirs();
        return Util.createLocalInputFile(filename, inputData);
    }

    // See PIG-1434
    @Test
    public void testScalarAliasesBatchNobatch() throws Exception{
        String[] input = {
                "1\t5",
                "2\t10",
                "3\t20"
        };

        String output = BUILD_TEST_TMP+"table_testScalarAliasesDir";
        TestScalarAliases.deleteDirectory(new File(output));
        // Test the use of scalars in expressions
        String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesBatch";
        TestScalarAliases.createLocalInputFile(inputPath, input);
        // Test in script mode
        pigServer.setBatchOn();
        pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);");
        pigServer.registerQuery("B = group A all;");
        pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
        pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);");
        pigServer.registerQuery("Store Y into '" + output + "';");
        pigServer.executeBatch();
        // Check output
        pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);");

        Iterator<Tuple> iter;
        Tuple t;
        iter = pigServer.openIterator("Z");

        t = iter.next();
        assertTrue(t.toString().equals("(3,0.25)"));

        t = iter.next();
        assertTrue(t.toString().equals("(6,0.5)"));

        t = iter.next();
        assertTrue(t.toString().equals("(9,1.0)"));

        assertFalse(iter.hasNext());

        iter = pigServer.openIterator("Y");

        t = iter.next();
        assertTrue(t.toString().equals("(3,0.25)"));

        t = iter.next();
        assertTrue(t.toString().equals("(6,0.5)"));

        t = iter.next();
        assertTrue(t.toString().equals("(9,1.0)"));

        assertFalse(iter.hasNext());


    }

    // See PIG-1434
    @Test
    public void testUseScalarMultipleTimes() throws Exception{
        String[] input = {
                "1\t5",
                "2\t10",
                "3\t20"
        };

        String outputY = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutY";
        TestScalarAliases.deleteDirectory(new File(outputY));
        String outputZ = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutZ";
        TestScalarAliases.deleteDirectory(new File(outputZ));
        // Test the use of scalars in expressions
        String inputPath = BUILD_TEST_TMP+"table_testUseScalarMultipleTimes";
        TestScalarAliases.createLocalInputFile(inputPath, input);
        pigServer.setBatchOn();
        pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);");
        pigServer.registerQuery("B = group A all;");
        pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
        pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);");
        pigServer.registerQuery("Store Y into '" + outputY + "';");
        pigServer.registerQuery("Z = foreach A generate (a1 + C.count), (a0 * C.max);");
        pigServer.registerQuery("Store Z into '" + outputZ + "';");
        // Test Multiquery store
        pigServer.executeBatch();

        // Check output
        pigServer.registerQuery("M = LOAD '" + outputY + "' as (a0: int, a1: double);");

        Iterator<Tuple> iter;
        Tuple t;
        iter = pigServer.openIterator("M");

        t = iter.next();
        assertTrue(t.toString().equals("(3,0.25)"));

        t = iter.next();
        assertTrue(t.toString().equals("(6,0.5)"));

        t = iter.next();
        assertTrue(t.toString().equals("(9,1.0)"));

        assertFalse(iter.hasNext());

        // Check output
        pigServer.registerQuery("N = LOAD '" + outputZ + "' as (a0: double, a1: double);");

        iter = pigServer.openIterator("N");

        t = iter.next();
        assertTrue(t.toString().equals("(8.0,20.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(13.0,40.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(23.0,60.0)"));

        assertFalse(iter.hasNext());

        // Non batch mode
        iter = pigServer.openIterator("Y");

        t = iter.next();
        assertTrue(t.toString().equals("(3,0.25)"));

        t = iter.next();
        assertTrue(t.toString().equals("(6,0.5)"));

        t = iter.next();
        assertTrue(t.toString().equals("(9,1.0)"));

        assertFalse(iter.hasNext());

        // Check in non-batch mode
        iter = pigServer.openIterator("Z");

        t = iter.next();
        assertTrue(t.toString().equals("(8.0,20.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(13.0,40.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(23.0,60.0)"));

        assertFalse(iter.hasNext());


    }

    // See PIG-1434
    @Test
    public void testScalarWithNoSchema() throws Exception{
        String[] scalarInput = {
                "1\t5"
        };
        String[] input = {
                "1\t5",
                "2\t10",
                "3\t20"
        };
        String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchema";
        TestScalarAliases.createLocalInputFile(inputPath, input);
        String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaScalar";
        TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput);
        // Load A as a scalar
        pigServer.registerQuery("A = LOAD '" + inputPath + "';");
        pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "' as (count, total);");
        pigServer.registerQuery("B = foreach A generate 5 / scalar.total;");

        Iterator<Tuple> iter = pigServer.openIterator("B");

        Tuple t = iter.next();
        assertTrue(t.get(0).toString().equals("1"));

        t = iter.next();
        assertTrue(t.get(0).toString().equals("1"));

        t = iter.next();
        assertTrue(t.get(0).toString().equals("1"));

        assertFalse(iter.hasNext());

    }

    // See PIG-1434
    @Test
    public void testScalarWithTwoBranches() throws Exception{
        String[] inputA = {
                "1\t5",
                "2\t10",
                "3\t20"
        };

        String[] inputX = {
                "pig",
                "hadoop",
                "rocks"
        };

        String output = BUILD_TEST_TMP+"testScalarWithTwoBranchesDir";
        TestScalarAliases.deleteDirectory(new File(output));
        // Test the use of scalars in expressions
        String inputPathA = BUILD_TEST_TMP+"testScalarWithTwoBranchesA";
        TestScalarAliases.createLocalInputFile(inputPathA, inputA);
        String inputPathX = BUILD_TEST_TMP+"testScalarWithTwoBranchesX";
        TestScalarAliases.createLocalInputFile(inputPathX, inputX);
        // Test in script mode
        pigServer.setBatchOn();
        pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0: long, a1: double);");
        pigServer.registerQuery("B = group A all;");
        pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;");
        pigServer.registerQuery("X = LOAD '" + inputPathX + "' as (names: chararray);");
        pigServer.registerQuery("Y = foreach X generate names, C.max;");
        pigServer.registerQuery("Store Y into '" + output + "';");
        pigServer.executeBatch();
        // Check output
        pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: chararray, a1: double);");

        Iterator<Tuple> iter = pigServer.openIterator("Z");

        Tuple t = iter.next();
        assertTrue(t.toString().equals("(pig,20.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(hadoop,20.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(rocks,20.0)"));

        assertFalse(iter.hasNext());

        // Check in non-batch mode
        iter = pigServer.openIterator("Y");

        t = iter.next();
        assertTrue(t.toString().equals("(pig,20.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(hadoop,20.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(rocks,20.0)"));

        assertFalse(iter.hasNext());

        pigServer.getPigContext().getProperties().remove("tez.am.inline.task.execution.max-tasks");

    }

    // See PIG-1434
    @Test
    public void testFilteredScalarDollarProj() throws Exception{
        String output = BUILD_TEST_TMP+"table_testFilteredScalarDollarProjDir";
        TestScalarAliases.deleteDirectory(new File(output));
        String[] input = {
                "1\t5\t[state#maine,city#portland]\t{(a),(b)}\t(a,b)",
                "2\t10\t\t\t",
                "3\t20\t\t\t"
        };

        // Test the use of scalars in expressions
        String inputPath = BUILD_TEST_TMP+"table_testFilteredScalarDollarProj";
        TestScalarAliases.createLocalInputFile(inputPath, input);
        // Test in script mode
        pigServer.setBatchOn();
        pigServer.registerQuery("A = LOAD '" + inputPath + "'" + " as (a0: long, a1: double, a2 : bytearray, " + "a3: bag{ t : tuple(tc : chararray)}, " + "a4: tuple(c1 : chararray, c2 : chararray) );");
        pigServer.registerQuery("B = filter A by $1 < 8;");
        pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / B.$1), B.$2, B.$2#'state', B.$3, B.a4;");
        pigServer.registerQuery("Store Y into '" + output + "';");
        pigServer.explain("Y", System.err);
        pigServer.executeBatch();
        // Check output
        pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);");
        pigServer.explain("Z", System.err);

        Iterator<Tuple> iter = pigServer.openIterator("Z");

        Tuple t = iter.next();
        assertTrue(t.toString().equals("(1,1.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(2,2.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(3,4.0)"));

        assertFalse(iter.hasNext());

        // Check in non-batch mode
        iter = pigServer.openIterator("Y");

        t = iter.next();
        assertEquals(t.toString(),"(1,1.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))");

        t = iter.next();
        assertEquals(t.toString(),"(2,2.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))");

        t = iter.next();
        assertEquals(t.toString(),"(3,4.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))");

        assertFalse(iter.hasNext());


    }

    // See PIG-1434
    @Test
    public void testScalarWithNoSchemaDollarProj() throws Exception{
        String[] scalarInput = {
                "1\t5"
        };
        String[] input = {
                "1\t5",
                "2\t10",
                "3\t20"
        };
        String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProj";
        TestScalarAliases.createLocalInputFile(inputPath, input);
        String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProjScalar";
        TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput);
        // Load A as a scalar
        pigServer.registerQuery("A = LOAD '" + inputPath + "';");
        pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "';");
        pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;");

        Iterator<Tuple> iter = pigServer.openIterator("B");

        Tuple t = iter.next();
        assertTrue(t.get(0).toString().equals("1"));

        t = iter.next();
        assertTrue(t.get(0).toString().equals("1"));

        t = iter.next();
        assertTrue(t.get(0).toString().equals("1"));

        assertFalse(iter.hasNext());

    }

    // See PIG-1434
    @Test
    public void testScalarAliasesJoinClause() throws Exception{
        String[] inputA = {
                "1\t5",
                "2\t10",
                "3\t20"
        };
        String[] inputB = {
                "Total3\tthree",
                "Total2\ttwo",
                "Total1\tone"
        };

        // Test the use of scalars in expressions
        String inputPathA = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseA";
        TestScalarAliases.createLocalInputFile(inputPathA, inputA);
        String inputPathB = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseB";
        TestScalarAliases.createLocalInputFile(inputPathB, inputB);
        // Test in script mode
        pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0, a1);");
        pigServer.registerQuery("G = group A all;");
        pigServer.registerQuery("C = foreach G generate COUNT(A) as count;");

        pigServer.registerQuery("B = LOAD '" + inputPathB + "' as (b0:chararray, b1:chararray);");
        pigServer.registerQuery("Y = join A by CONCAT('Total', (chararray)C.count), B by $0;");

        Iterator<Tuple> iter = pigServer.openIterator("Y");

        String[] expected = new String[] {
                "(1,5,Total3,three)",
                "(2,10,Total3,three)",
                "(3,20,Total3,three)"
        };

        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("Y")));
    }

    // See PIG-1434
    @Test
    public void testScalarAliasesFilterClause() throws Exception{
        String[] input = {
                "1\t5",
                "2\t10",
                "3\t20",
                "4\t12",
                "5\t8"
        };

        // Test the use of scalars in expressions
        String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesFilterClause";
        TestScalarAliases.createLocalInputFile(inputPath, input);
        // Test in script mode
        pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0, a1);");
        pigServer.registerQuery("G = group A all;");
        pigServer.registerQuery("C = foreach G generate AVG(A.$1) as average;");

        pigServer.registerQuery("Y = filter A by a1 > C.average;");

        Iterator<Tuple> iter = pigServer.openIterator("Y");

        // Average is 11
        Tuple t = iter.next();
        assertTrue(t.toString().equals("(3,20)"));

        t = iter.next();
        assertTrue(t.toString().equals("(4,12)"));

        assertFalse(iter.hasNext());
    }

    // See PIG-1434
    @Test
    public void testScalarAliasesGrammarNegative() throws Exception{
        String[] input = {
                "1\t5",
                "2\t10",
                "3\t20"
        };

        String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesGrammar";
        TestScalarAliases.createLocalInputFile(inputPath, input);

        try {
            pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);");
            pigServer.registerQuery("B = group A all;");
            pigServer.registerQuery("C = foreach B generate COUNT(A);");
            // Only projections of C are supported
            pigServer.registerQuery("Y = foreach A generate C;");
            pigServer.openIterator( "Y" );
            //Control should not reach here
            fail("Scalar projections are only supported");
        } catch (IOException pe){
            assertTrue(pe.getMessage().contains("Invalid scalar projection: C"));
        }
    }

    // See PIG-1636
    @Test
    public void testScalarAliasesLimit() throws Exception{
        String[] input = {
                "a\t1",
                "b\t2",
                "c\t3",
                "a\t4",
                "c\t5"
        };

        // Test the use of scalars in expressions
        String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesLimit";
        TestScalarAliases.createLocalInputFile(inputPath, input);
        // Test in script mode
        pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0:chararray, a1: int);");
        pigServer.registerQuery("G = group A all;");
        pigServer.registerQuery("C = foreach G generate SUM(A.$1) as total;");
        pigServer.registerQuery("C1 = limit C 1;");
        pigServer.registerQuery("Y = foreach A generate a0, a1 * (double)C1.total;");

        Iterator<Tuple> iter = pigServer.openIterator("Y");

        // Average is 11
        Tuple t = iter.next();
        assertTrue(t.toString().equals("(a,15.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(b,30.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(c,45.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(a,60.0)"));

        t = iter.next();
        assertTrue(t.toString().equals("(c,75.0)"));

        assertFalse(iter.hasNext());
    }

    /**
     * Test that a specific string is included in the error message when an
     * exception is thrown for using a relation in a
     * scalar context without projecting any columns out of it
     */
    // See PIG-1788
    @Test
    public void testScalarWithNoProjection() throws Exception{
        String query =
            "  A = load 'table_testScalarWithNoProjection' as (x, y);" +
            "  B = group A by x;" +
            // B is unintentionally being used as scalar,
            // the user intends it to be COUNT(A)
            "  C = foreach B generate COUNT(B);";

        Util.checkExceptionMessage(query, "C",
                "A column needs to be projected from a relation" +
                " for it to be used as a scalar"
        );
    }

    @Test
    public void testScalarNullValue() throws Exception{
        Storage.Data data = Storage.resetData(pigServer);
        data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2));

        pigServer.setBatchOn();
        pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);");
        pigServer.registerQuery("B = FILTER A by a == 'c';");
        pigServer.registerQuery("C = FOREACH A generate a, b + B.b;");
        pigServer.registerQuery("store C into 'output' using mock.Storage();");

        pigServer.executeBatch();

        List<Tuple> actualResults = data.get("output");
        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                new String[] {"('a', null)", "('b', null)"});
        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);

    }

    @Test
    public void testScalarNullValue2() throws Exception{
        Storage.Data data = Storage.resetData(pigServer);
        data.set("input", Storage.tuple("a", 1), Storage.tuple("b", 2));

        pigServer.registerQuery("A = load 'input' using mock.Storage() as (a:chararray, b:int);");
        pigServer.registerQuery("B = FILTER A by a == 'c';");
        pigServer.registerQuery("C = GROUP B ALL;");
        pigServer.registerQuery("D = FOREACH C GENERATE COUNT(B.b) as count;");
        pigServer.registerQuery("E = FOREACH A GENERATE (D.count IS NOT NULL? D.count : 0l);;");

        Iterator<Tuple> iter = pigServer.openIterator("E");
        Tuple t = iter.next();
        assertTrue(t.toString().equals("(0)"));
    }
}
