/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;

import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.pig.parser.ParserException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestAccumulator {
    private static final String INPUT_FILE1 = "AccumulatorInput1.txt";
    private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
    private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
    private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
    private static final String INPUT_DIR = Util.getTestDirectory(TestAccumulator.class);

    private static PigServer pigServer;
    private static Properties properties;
    private static MiniGenericCluster cluster;

    @BeforeClass
    public static void oneTimeSetUp() throws Exception {
        cluster = MiniGenericCluster.buildCluster();
        properties = cluster.getProperties();
        properties.setProperty("pig.accumulative.batchsize", "2");
        properties.setProperty("pig.exec.nocombiner", "true");
        // Reducing the number of retry attempts to speed up test completion
        properties.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS,"1");
        properties.setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS,"1");
        createFiles();
    }

    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        deleteFiles();
        cluster.shutDown();
    }

    @Before
    public void setUp() throws Exception {
        Util.resetStateForExecModeSwitch();
        // Drop stale configuration from previous test run
        properties.remove(PigConfiguration.PIG_OPT_ACCUMULATOR);
        pigServer = new PigServer(cluster.getExecType(), properties);
    }

    @After
    public void tearDown() throws Exception {
        pigServer.shutdown();
        pigServer = null;
    }

    private static void createFiles() throws IOException {
        new File(INPUT_DIR).mkdirs();

        PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1));

        w.println("100\tapple");
        w.println("200\torange");
        w.println("300\tstrawberry");
        w.println("300\tpear");
        w.println("100\tapple");
        w.println("300\tpear");
        w.println("400\tapple");
        w.close();

        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);

        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));

        w.println("100\t");
        w.println("100\t");
        w.println("200\t");
        w.println("200\t");
        w.println("300\tstrawberry");
        w.close();

        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);

        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE3));

        w.println("100\t1.0");
        w.println("100\t2.0");
        w.println("200\t1.1");
        w.println("200\t2.1");
        w.println("100\t3.0");
        w.println("100\t4.0");
        w.println("200\t3.1");
        w.println("100\t5.0");
        w.println("300\t3.3");
        w.println("400\t");
        w.println("400\t");
        w.close();

        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);

        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE4));

        w.println("100\thttp://ibm.com,ibm");
        w.println("100\thttp://ibm.com,ibm");
        w.println("200\thttp://yahoo.com,yahoo");
        w.println("300\thttp://sun.com,sun");
        w.close();

        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE4, INPUT_FILE4);
    }

    private static void deleteFiles() {
        Util.deleteDirectory(new File(INPUT_DIR));
    }

    @Test
    public void testAccumBasic() throws IOException{
        // test group by
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A);");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 2);
        expected.put(200, 1);
        expected.put(300, 3);
        expected.put(400, 1);

        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }

        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);");

        try{
            iter = pigServer.openIterator("C");

            while(iter.hasNext()) {
                Tuple t = iter.next();
                assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
            }
            fail("accumulator should not be called.");
        }catch(IOException e) {
            // should throw exception from AccumulatorBagCount.
        }

        // test cogroup
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("C = cogroup A by id, B by id;");
        pigServer.registerQuery("D = foreach C generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");

        HashMap<Integer, String> expected2 = new HashMap<Integer, String>();
        expected2.put(100, "2,2");
        expected2.put(200, "1,1");
        expected2.put(300, "3,3");
        expected2.put(400, "1,1");

        iter = pigServer.openIterator("D");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString());
        }
    }

    @Test
    public void testAccumWithNegative() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  -org.apache.pig.test.utils.AccumulatorBagCount(A);");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, -2);
        expected.put(200, -1);
        expected.put(300, -3);
        expected.put(400, -1);


        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
    }

    @Test
    public void testAccumWithAdd() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");

        {
            HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
            expected.put(100, 3.0);
            expected.put(200, 2.0);
            expected.put(300, 4.0);
            expected.put(400, 2.0);


            Iterator<Tuple> iter = pigServer.openIterator("C");

            while(iter.hasNext()) {
                Tuple t = iter.next();
                assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
            }
        }

        {
            pigServer.registerQuery("C = foreach B generate group,  " +
            "org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);");

            HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>();
            expected.put(100, 4);
            expected.put(200, 2);
            expected.put(300, 6);
            expected.put(400, 2);


            Iterator<Tuple> iter = pigServer.openIterator("C");

            while(iter.hasNext()) {
                Tuple t = iter.next();
                assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
            }
        }
    }

    @Test
    public void testAccumWithMinus() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group, " +
                " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");

        HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
        expected.put(100, 4.0);
        expected.put(200, 2.0);
        expected.put(300, 6.0);
        expected.put(400, 2.0);


        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
        }
    }

    @Test
    public void testAccumWithMod() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 0);
        expected.put(200, 1);
        expected.put(300, 1);
        expected.put(400, 1);


        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
    }

    @Test
    public void testAccumWithDivide() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 1);
        expected.put(200, 0);
        expected.put(300, 1);
        expected.put(400, 0);


        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
    }

    @Test
    public void testAccumWithAnd() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 0);
        expected.put(200, 1);
        expected.put(300, 1);
        expected.put(400, 1);


        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
    }

    @Test
    public void testAccumWithOr() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 1);
        expected.put(200, 0);
        expected.put(300, 1);
        expected.put(400, 0);


        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
    }

    @Test
    public void testAccumWithRegexp() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 1);
        expected.put(200, 0);
        expected.put(300, 1);
        expected.put(400, 0);


        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
    }

    @Test
    public void testAccumWithIsNull() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 0);
        expected.put(200, 0);
        expected.put(300, 1);

        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
    }

    @SuppressWarnings("unchecked")
    @Test
    public void testAccumWithIsEmpty() throws IOException{
        pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "1");
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
        pigServer.registerQuery("C = cogroup A by id outer, B by id outer;");
        pigServer.registerQuery("D = foreach C generate group," +
                "(int)(IsEmpty(A) ? 0 : SUM(A.id)) as suma," +
                "(int)(IsEmpty(B) ? 0 : SUM(B.id)) as sumb;");

        List<Tuple> expected = new ArrayList<Tuple>();
        expected.add(tuple(100, 200, 200));
        expected.add(tuple(200, 200, 400));
        expected.add(tuple(300, 900, 300));
        expected.add(tuple(400, 400, 0));

        Iterator<Tuple> iter = pigServer.openIterator("D");
        List<Tuple> actual = new ArrayList<Tuple>();

        while(iter.hasNext()) {
            actual.add(iter.next());
        }
        Collections.sort(actual);
        assertEquals(expected, actual);
    }

    @Test
    public void testAccumWithDistinct() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B { D = distinct A;" +
                "generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 2);
        expected.put(200, 2);
        expected.put(300, 3);
        expected.put(400, 2);

        Iterator<Tuple> iter = pigServer.openIterator("C");

        int count = 0;
        while(iter.hasNext()) {
            count++;
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
        }
        assertEquals(4, count);
    }

    @Test
    public void testAccumWithSort() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
        pigServer.registerQuery("B = foreach A generate id, f, id as t;");
        pigServer.registerQuery("C = group B by id;");
        pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f;" +
                "generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");

        HashMap<Integer, String> expected = new HashMap<Integer, String>();
        expected.put(100, "(apple)(apple)");
        expected.put(200, "(orange)");
        expected.put(300, "(pear)(pear)(strawberry)");
        expected.put(400, "(apple)");

        Iterator<Tuple> iter = pigServer.openIterator("D");

        int count = 0;
        while(iter.hasNext()) {
            count++;
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
        }
        assertEquals(4, count);
    }

    @Test
    public void testAccumWithBuiltinAvg() throws IOException {
      HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
      expected.put(100, 3.0);
      expected.put(200, 2.1);
      expected.put(300, 3.3);
      expected.put(400, null);
      // Test all the averages for correct behaviour with null values
      String[] types = { "double", "float", "int", "long" };
      for (int i = 0; i < types.length; i++) {
          if (i > 1) { // adjust decimal error for non real types
              expected.put(200, 2.0);
              expected.put(300, 3.0);
          }
          pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:"
                  + types[i] + ");");
          pigServer.registerQuery("C = group A by id;");
          pigServer.registerQuery("D = foreach C generate group, AVG(A.v);");
          Iterator<Tuple> iter = pigServer.openIterator("D");

          while (iter.hasNext()) {
              Tuple t = iter.next();
              Double v = expected.get((Integer) t.get(0));
              if (v != null) {
                  assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(),
                          0.0001);
              } else {
                  assertNull(t.get(1));
              }
          }
      }
    }

    @Test
    public void testAccumWithBuiltin() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
        pigServer.registerQuery("C = group A by id;");
        // moving AVG accumulator test to separate test case
        pigServer.registerQuery("D = foreach C generate group, SUM(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");

        HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
        expected.put(100, new Double[]{15.0, 5.0, 1.0, 5.0});
        expected.put(200, new Double[]{6.3, 3.0, 1.1, 3.1});
        expected.put(300, new Double[]{3.3, 1.0, 3.3, 3.3});
        expected.put(400, new Double[] { null, 0.0, null, null });

        Iterator<Tuple> iter = pigServer.openIterator("D");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            Double[] v = expected.get((Integer)t.get(0));
            for(int i=0; i<v.length; i++) {
                if (v[i] != null) {
                    assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1))
                            .doubleValue(), 0.0001);
                } else {
                    assertNull(t.get(i + 1));
                }
            }
        }
    }

    // Pig 4365
    @Test
    public void testAccumWithTOP() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
        pigServer.registerQuery("B = group A all;");
        pigServer.registerQuery("D = foreach B { C = TOP(5, 0, A); generate flatten(C); }");
        
        Iterator<Tuple> iter = pigServer.openIterator("D");
    
        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(
                new String[] {"(200,1.1)", "(200,2.1)", "(300,3.3)", "(400,null)", "(400,null)" });
        
        Util.checkQueryOutputsAfterSort(iter, expected);
    }

    @Test
    public void testAccumWithMultiBuiltin() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, c:chararray);");
        pigServer.registerQuery("C = group A by 1;");
        pigServer.registerQuery("D = foreach C generate SUM(A.id), 1+SUM(A.id)+SUM(A.id);");

        Iterator<Tuple> iter = pigServer.openIterator("D");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            t.get(0).toString().equals("1700");
            t.get(1).toString().equals("3401");
        }
    }

    // Pig 1105
    @Test
    public void testAccumCountStar() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
        pigServer.registerQuery("C = group A by id;");
        pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);");

        pigServer.openIterator("D");
    }

    /**
     * see PIG-1963.
     * If there is a POSort or PODistinct still remaining in the plan
     * (after secondary sort optimization), accumulative mode can't
     * be used as they are blocking operators
     * @throws IOException
     */
    @Test
    public void testAccumulatorOffOnSort() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        //one POSort will remain because secondary sort can be used only for one of them
        pigServer.registerQuery("C = foreach B " +
                        "{ " +
                        "  o1 = order A by fruit;" +
                        "  o2 = order A by fruit desc;" +
                        "  generate  org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
                        "                  org.apache.pig.test.utils.AccumulativeSumBag(o2.fruit); " +
                        "};");

        checkAccumulatorOff("C");
    }

    /**
     * see PIG-1963.
     * If there is a POSort or PODistinct still remaining in the plan
     * (after secondary sort optimization), accumulative mode can't
     * be used as they are blocking operators
     * @throws IOException
     */
    @Test
    public void testAccumulatorOffOnDistinct() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit, category);");
        pigServer.registerQuery("B = group A by id;");

        pigServer.registerQuery("C = foreach B " +
                        "{ " +
                        "  o1 = order A by fruit;" +
                        "  d2 = distinct A.category;" +
                        "  generate  org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
                        "                  org.apache.pig.test.utils.AccumulativeSumBag(d2); " +
                        "};");

        checkAccumulatorOff("C");
    }

    @Test
    public void testAccumulatorOff() throws IOException{
        pigServer.getPigContext().getProperties().setProperty(
                PigConfiguration.PIG_OPT_ACCUMULATOR, "false");

        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group," +
                "org.apache.pig.test.utils.AccumulativeSumBag(A);");

        checkAccumulatorOff("C");
        pigServer.getPigContext().getProperties().setProperty(
                PigConfiguration.PIG_OPT_ACCUMULATOR, "true");

    }

    private void checkAccumulatorOff(String alias) {
        try {
            Iterator<Tuple> iter = pigServer.openIterator(alias);
            while(iter.hasNext()) {
                iter.next();
            }
            fail("Accumulator should be off.");
        }catch(Exception e) {
            // we should get exception
        }
    }

    @Test
    public void testAccumWithMap() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);");
        pigServer.registerQuery("B = group A by (id, url);");
        pigServer.registerQuery("C = foreach B generate COUNT(A)," +
                "org.apache.pig.test.utils.URLPARSE(group.url)#'url';");

        HashMap<Integer, String> expected = new HashMap<Integer, String>();
        expected.put(2, "http://ibm.com");
        expected.put(1, "http://yahoo.com");
        expected.put(1, "http://sun.com");

        Iterator<Tuple> iter = pigServer.openIterator("C");

        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Long)t.get(0)), (String)t.get(1));
        }
    }

    @Test // PIG-1837
    public void testBinCondCheck() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter("data1"));
        w.println("1\t11");
        w.println("2\t15");
        w.close();

        Util.copyFromLocalToCluster(cluster, "data1", "data1");

        w = new PrintWriter(new FileWriter("data2"));
        w.println("1\t10");
        w.println("4\t11");
        w.println("5\t10");
        w.close();

        Util.copyFromLocalToCluster(cluster, "data2", "data2");

        pigServer.registerQuery("A = load 'data1' as (x:int, y:int);");
        pigServer.registerQuery("B = load 'data2' as (x:int, z:int);");
        pigServer.registerQuery("C = cogroup A by x, B by x;");
        pigServer.registerQuery("D = foreach C generate group," +
                "SUM((IsEmpty(A.y) ? {(0)} : A.y)) + SUM((IsEmpty(B.z) ? {(0)} : B.z));");

        HashMap<Integer, Long> expected = new HashMap<Integer, Long>();
        expected.put(1, 21l);
        expected.put(2, 15l);
        expected.put(4, 11l);
        expected.put(5, 10l);

        Iterator<Tuple> iter = pigServer.openIterator("D");
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Long)t.get(1));
        }
    }

    /**
     * see PIG-1911 .
     * accumulator udf reading from a nested relational op. generate projects
     * only the accumulator udf.
     * @throws IOException
     * @throws ParseException
     */
    @Test
    public void testAccumAfterNestedOp() throws IOException, ParserException{
        // test group by
        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = foreach A generate id;"); //additional test to enable columnprune(PIG-5224)
        pigServer.registerQuery("C = group B by id;");
        pigServer.registerQuery("D = foreach C " +
                        "{ o = order B by id; " +
                        "  generate org.apache.pig.test.utils.AccumulatorBagCount(o);}; ");

        Iterator<Tuple> iter = pigServer.openIterator("D");
        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(2)",
                            "(1)",
                            "(3)",
                            "(1)"
                    });
        Util.checkQueryOutputsAfterSort(iter, expectedRes);
    }


}
