/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;

import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.parser.ParserException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * Test PORelationToExprProject which is a special project
 * introduced to handle the following case:
 * This project is Project(*) introduced after a relational operator
 * to supply a bag as output (as an expression). This project is either
 * providing the bag as input to a successor expression operator or is
 * itself the leaf in a inner plan
 * If the predecessor relational operator sends an EOP
 * then send an empty bag first to signal "empty" output
 * and then send an EOP

 * NOTE: A Project(*) of return type BAG whose predecessor is
 * from an outside plan (i.e. not in the same inner plan as the project)
 * will NOT lead us here. So a query like:
 * a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
 * will go through a regular project (without the following flag)
 */
public class TestRelationToExprProject {
    private PigServer pigServer;
    private static final String TEST_FILTER_COUNT3_INPUT="test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt";

    /* (non-Javadoc)
     * @see junit.framework.TestCase#setUp()
     */
    @Before
    public void setUp() throws Exception {
        pigServer = new PigServer(Util.getLocalTestMode());
    }

    /* (non-Javadoc)
     * @see junit.framework.TestCase#tearDown()
     */
    @After
    public void tearDown() throws Exception {
        pigServer.shutdown();
    }

    // based on the script provided in the jira issue:PIG-514
    // tests that when a filter inside a foreach filters away all tuples
    // for a group, an empty bag is still provided to udfs whose
    // input is the filter
    @Test
    public void testFilterCount1() throws IOException, ParserException {
        Data data = resetData(pigServer);
        data.set("foo", tuple(1,1,3), tuple(1,2,3), tuple(2,1,3), tuple(2,1,3));
        String script = "test   = load 'foo' using mock.Storage() as (col1: int, col2: int, col3: int);" +
                "test2 = group test by col1;" +
                "test3 = foreach test2 {" +
                "        filter_one    = filter test by (col2==1);" +
                "        filter_notone = filter test by (col2!=1);" +
                "        generate group as col1, COUNT(filter_one) as cnt_one, COUNT(filter_notone) as cnt_notone;};";
        pigServer.registerQuery(script);
        Iterator<Tuple> it = pigServer.openIterator("test3");
        Tuple[] expected = new DefaultTuple[2];
        expected[0] = (Tuple) Util.getPigConstant("(1,1L,1L)");
        expected[1] = (Tuple) Util.getPigConstant("(2,2L,0L)");
        Object[] results = new Object[2];
        int i = 0;
        while(it.hasNext()) {
            if(i == 2) {
                fail("Got more tuples than expected!");
            }
            Tuple t = it.next();
            if(t.get(0).equals(1)) {
                // this is the first tuple
                results[0] = t;
            } else {
                results[1] = t;
            }
            i++;
        }
        for (int j = 0; j < expected.length; j++) {
            assertTrue(expected[j].equals(results[j]));
        }
    }

    // based on jira PIG-710
    // tests that when a filter inside a foreach filters away all tuples
    // for a group, an empty bag is still provided to udfs whose
    // input is the filter
    @Test
    public void testFilterCount2() throws IOException, ParserException {
        Data data = resetData(pigServer);
        data.set("foo",
                tuple("a", "hello"),
                tuple("a", "goodbye"),
                tuple("b", "goodbye"),
                tuple("c", "hello"),
                tuple("c", "hello"),
                tuple("c", "hello"),
                tuple("d", "what")
                );
        String query = "A = load 'foo' using mock.Storage() as ( id:chararray, str:chararray );" +
                "B = group A by ( id );" +
                "Cfiltered = foreach B {" +
                "        D = filter A by (" +
                "                str matches 'hello'" +
                "                );" +
                "        matchedcount = COUNT(D);" +
                "        generate" +
                "                group," +
                "                matchedcount as matchedcount," +
                "                A.str;" +
                "        };";
        pigServer.registerQuery(query);
        Iterator<Tuple> it = pigServer.openIterator("Cfiltered");
        Map<String, Tuple> expected = new HashMap<String, Tuple>();
        expected.put("a", (Tuple) Util.getPigConstant("('a',1L,{('hello'),('goodbye')})"));
        expected.put("b", (Tuple) Util.getPigConstant("('b',0L,{('goodbye')})"));
        expected.put("c", (Tuple) Util.getPigConstant("('c',3L,{('hello'),('hello'),('hello')})"));
        expected.put("d", (Tuple) Util.getPigConstant("('d',0L,{('what')})"));
        int i = 0;
        while(it.hasNext()) {
            Tuple actual = it.next();
            assertEquals(expected.get(actual.get(0)), actual);
            i++;
        }
        assertEquals(4, i);
    }

    // based on jira PIG-739
    // tests that when a filter inside a foreach filters away all tuples
    // for a group, an empty bag is still provided to udfs whose
    // input is the filter
    @Test
    public void testFilterCount3() throws IOException, ParserException {
        String query = "TESTDATA =  load '"+TEST_FILTER_COUNT3_INPUT+"' using PigStorage() as (timestamp:chararray, testid:chararray, userid: chararray, sessionid:chararray, value:long, flag:int);" +
                "TESTDATA_FILTERED = filter TESTDATA by (timestamp gte '1230800400000' and timestamp lt '1230804000000' and value != 0);" +
                "TESTDATA_GROUP = group TESTDATA_FILTERED by testid;" +
                "TESTDATA_AGG = foreach TESTDATA_GROUP {" +
                "                        A = filter TESTDATA_FILTERED by (userid eq sessionid);" +
                "                        C = distinct A.userid;" +
                "                        generate group as testid, COUNT(TESTDATA_FILTERED) as counttestdata, COUNT(C) as distcount, SUM(TESTDATA_FILTERED.flag) as total_flags;" +
                "                }" +
                "TESTDATA_AGG_1 = group TESTDATA_AGG ALL;" +
                "TESTDATA_AGG_2 = foreach TESTDATA_AGG_1 generate COUNT(TESTDATA_AGG);" ;
        pigServer.registerQuery(query);
        Iterator<Tuple> it = pigServer.openIterator("TESTDATA_AGG_2");

        int i = 0;
        while(it.hasNext()) {
            Tuple actual = it.next();
            assertEquals(20l, actual.get(0));
            i++;
        }
        assertEquals(1, i);
    }

    // test case where RelationToExprProject is present in the
    // single inner plan of foreach - this will test that it does
    // send an EOP eventually for each input of the foreach
    @Test
    public void testFilter1() throws IOException, ParserException {
        Data data = resetData(pigServer);
        data.set("foo", tuple(1,1,3), tuple(1,2,3), tuple(2,1,3), tuple(2,1,3), tuple(3,4,4));

        String script = "test   = load 'foo' using mock.Storage() as (col1: int, col2: int, col3: int);" +
                "test2 = group test by col1;" +
                "test3 = foreach test2 {" +
                "        filter_one    = filter test by (col2==1);" +
                "        generate filter_one;};";
        pigServer.registerQuery(script);
        Iterator<Tuple> it = pigServer.openIterator("test3");
        Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
        expected.put((Tuple) Util.getPigConstant("({(1,1,3)})"), 0);
        expected.put((Tuple) Util.getPigConstant("({(2,1,3),(2,1,3)})"), 0);
        Tuple t = TupleFactory.getInstance().newTuple();
        t.append(BagFactory.getInstance().newDefaultBag());
        expected.put(t, 0);
        int i = 0;
        while(it.hasNext()) {
            if(i == 3) {
                fail("Got more tuples than expected!");
            }
            t = it.next();
            assertTrue(expected.containsKey(t));
            int occurences = expected.get(t);
            occurences++;
            expected.put(t, occurences);
            i++;
        }
        for (Integer occurences : expected.values()) {
            assertEquals(new Integer(1), occurences);
        }
    }

    // test case where RelationToExprProject is present in a
    // different inner plan along with another plan to project the group
    // in foreach - this will test that reset() correctly resets
    // the state that empty bags need to be sent on EOP if no non-EOP
    // input has been seen on a fresh input from foreach.
    @Test
    public void testFilter2() throws IOException, ParserException {
        Data data = resetData(pigServer);
        data.set("foo", tuple(1,1,3), tuple(1,2,3), tuple(2,1,3), tuple(2,1,3), tuple(3,4,4));

        String script = "test   = load 'foo' using mock.Storage() as (col1: int, col2: int, col3: int);" +
                "test2 = group test by col1;" +
                "test3 = foreach test2 {" +
                "        filter_one    = filter test by (col2==1);" +
                "        generate group, filter_one;};";
        Util.registerMultiLineQuery(pigServer, script);
        Iterator<Tuple> it = pigServer.openIterator("test3");
        Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
        expected.put((Tuple) Util.getPigConstant("(1,{(1,1,3)})"), 0);
        expected.put((Tuple) Util.getPigConstant("(2,{(2,1,3),(2,1,3)})"), 0);
        Tuple t = TupleFactory.getInstance().newTuple();
        t.append(new Integer(3));
        t.append(BagFactory.getInstance().newDefaultBag());
        expected.put(t, 0);
        int i = 0;
        while(it.hasNext()) {
            if(i == 3) {
                fail("Got more tuples than expected!");
            }
            t = it.next();
            assertTrue(expected.containsKey(t));
            int occurences = expected.get(t);
            occurences++;
            expected.put(t, occurences);
            i++;
        }
        for (Integer occurences : expected.values()) {
            assertEquals(Integer.valueOf(1), occurences);
        }
    }
}
