/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.pig.EvalFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestCombiner {
    private static MiniGenericCluster cluster;
    private static Properties properties;

    @BeforeClass
    public static void oneTimeSetUp() throws Exception {
        cluster = MiniGenericCluster.buildCluster();
        properties = cluster.getProperties();
    }

    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        cluster.shutDown();
    }

    @Before
    public void setUp() throws Exception {
        Util.resetStateForExecModeSwitch();
    }

    @Test
    public void testSuccessiveUserFuncs1() throws Exception {
        String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +
                "c = group a by c2; " +
                "f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); " +
                "store f into 'out';";
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        PigContext pc = pigServer.getPigContext();
        assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
                .isEmpty()));
        pigServer.shutdown();
    }

    @Test
    public void testSuccessiveUserFuncs2() throws Exception {
        String dummyUDF = JiraPig1030.class.getName();
        String query = "a = load 'students.txt' as (c1,c2,c3,c4); " +
                "c = group a by c2; " +
                "f = foreach c generate COUNT(" + dummyUDF + "" +
                "(org.apache.pig.builtin.Distinct($1.$2)," + dummyUDF + "())); " +
                "store f into 'out';";
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        PigContext pc = pigServer.getPigContext();
        assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
                .isEmpty()));
        pigServer.shutdown();
    }

    @Test
    public void testOnCluster() throws Exception {
        // run the test on cluster
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        String inputFileName = runTest(pigServer);
        Util.deleteFile(cluster, inputFileName);
        pigServer.shutdown();
    }

    @Test
    public void testLocal() throws Exception {
        // run the test locally
        FileLocalizer.deleteTempFiles();
        runTest(new PigServer("local"));
        FileLocalizer.deleteTempFiles();
    }

    private String runTest(PigServer pig) throws IOException {
        List<String> inputLines = new ArrayList<String>();
        inputLines.add("a,b,1");
        inputLines.add("a,b,1");
        inputLines.add("a,c,1");
        String inputFileName = loadWithTestLoadFunc("A", pig, inputLines);

        pig.registerQuery("B = foreach A generate $0 as (c0:chararray), $1 as (c1:chararray), $2 as (c2:int);");
        pig.registerQuery("C = group B by ($0, $1);");
        pig.registerQuery("D = foreach C generate flatten(group), COUNT($1) as int;");
        // Since the input has no schema, using Util.getTuplesFromConstantTupleStrings fails assert.
        List<Tuple> resultTuples = Util.getTuplesFromConstantTupleStrings(
            new String[]{
                "('a','b',2)",
                "('a','c',1)",
            });
        Iterator<Tuple> resultIterator = pig.openIterator("D");
        Util.checkQueryOutputsAfterSort(resultIterator, resultTuples);

        return inputFileName;
    }

    private String loadWithTestLoadFunc(String loadAlias, PigServer pig,
            List<String> inputLines) throws IOException {
        File inputFile = File.createTempFile("test", "txt");
        inputFile.deleteOnExit();
        String inputFileName = inputFile.getAbsolutePath();
        if (pig.getPigContext().getExecType().isLocal()) {
            PrintStream ps = new PrintStream(new FileOutputStream(inputFile));
            for (String line : inputLines) {
                ps.println(line);
            }
            ps.close();
        } else {
            inputFileName = Util.removeColon(inputFileName);
            Util.createInputFile(cluster, inputFileName, inputLines.toArray(new String[] {}));
        }
        pig.registerQuery(loadAlias + " = load '"
                + Util.encodeEscape(inputFileName) + "' using "
                + PigStorage.class.getName() + "(',');");
        return inputFileName;
    }

    @Test
    public void testNoCombinerUse() {
        // To simulate this, we will have two input files
        // with exactly one input record - this should result
        // in two map tasks and each would process only one record
        // hence the combiner would not get called.
    }

    @Test
    public void testMultiCombinerUse() throws Exception {
        // test the scenario where the combiner is called multiple
        // times - this can happen when the output of the map > io.sort.mb
        // let's set the io.sort.mb to 1MB and > 1 MB map data.
        String[] input = new String[500 * 1024];
        for (int i = 0; i < input.length; i++) {
            if (i % 2 == 0) {
                input[i] = Integer.toString(1);
            } else {
                input[i] = Integer.toString(0);
            }
        }
        Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
        String oldValue = properties.getProperty("io.sort.mb");
        properties.setProperty("io.sort.mb", "1");

        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx1024m");
        pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
        pigServer.registerQuery("b = group a all;");
        pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
                "MIN(a.$0), MAX(a.$0), AVG(a.$0), ((double)SUM(a.$0))/COUNT(a.$0)," +
                " COUNT(a.$0) + SUM(a.$0) +  MAX(a.$0);");

        // make sure there is a combine plan in the explain output
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        pigServer.explain("c", ps);
        checkCombinerUsed(pigServer, "c", true);

        Iterator<Tuple> it = pigServer.openIterator("c");
        Tuple t = it.next();
        assertEquals(512000L, t.get(0));
        assertEquals(256000L, t.get(1));
        assertEquals(0, t.get(2));
        assertEquals(1, t.get(3));
        assertEquals(0.5, t.get(4));
        assertEquals(0.5, t.get(5));
        assertEquals(512000L + 256000L + 1, t.get(6));

        assertFalse(it.hasNext());
        Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
        // Reset io.sort.mb to the original value before exit
        if (oldValue == null) {
            properties.remove("io.sort.mb");
        } else {
            properties.setProperty("io.sort.mb", oldValue);
        }
        pigServer.shutdown();
    }

    @Test
    public void testDistinctAggs1() throws Exception {
        // test the use of combiner for distinct aggs:
        String input[] = {
                        "pig1\t18\t2.1",
                        "pig2\t24\t3.3",
                        "pig5\t45\t2.4",
                        "pig1\t18\t2.1",
                        "pig1\t19\t2.1",
                        "pig2\t24\t4.5",
                        "pig1\t20\t3.1" };

        Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as (name:chararray, age:int, gpa:double);");
        pigServer.registerQuery("b = group a by name;");
        pigServer.registerQuery("c = foreach b  {" +
                "        x = distinct a.age;" +
                "        y = distinct a.gpa;" +
                "        z = distinct a;" +
                "        generate group, COUNT(x), SUM(x.age), SUM(y.gpa), SUM(a.age), " +
                "                       SUM(a.gpa), COUNT(z.age), COUNT(z), SUM(z.age);};");

        // make sure there is a combine plan in the explain output
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        pigServer.explain("c", ps);
        checkCombinerUsed(pigServer, "c", true);

        HashMap<String, Object[]> results = new HashMap<String, Object[]>();
        results.put("pig1", new Object[] { "pig1", 3L, 57L, 5.2, 75L, 9.4, 3L, 3L, 57L });
        results.put("pig2", new Object[] { "pig2", 1L, 24L, 7.8, 48L, 7.8, 2L, 2L, 48L });
        results.put("pig5", new Object[] { "pig5", 1L, 45L, 2.4, 45L, 2.4, 1L, 1L, 45L });
        Iterator<Tuple> it = pigServer.openIterator("c");
        while (it.hasNext()) {
            Tuple t = it.next();
            List<Object> fields = t.getAll();
            Object[] expected = results.get(fields.get(0));
            int i = 0;
            for (Object field : fields) {
                assertEquals(expected[i++], field);
            }
        }
        Util.deleteFile(cluster, "distinctAggs1Input.txt");
        pigServer.shutdown();
    }

    @Test
    public void testGroupAndUnion() throws Exception {
        // test use of combiner when group elements are accessed in the foreach
        String input1[] = {
                "ABC\t1\ta\t1",
                "ABC\t1\tb\t2",
                "ABC\t1\ta\t3",
                "ABC\t2\tb\t4",
        };

        Util.createInputFile(cluster, "testGroupElements1.txt", input1);
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        pigServer.debugOn();
        pigServer.registerQuery("a1 = load 'testGroupElements1.txt' " +
                "as (str:chararray, num1:int, alph : chararray, num2 : int);");
        pigServer.registerQuery("b1 = group a1 by str;");

        // check if combiner is present or not for various forms of foreach
        pigServer.registerQuery("c1 = foreach b1  generate flatten(group), COUNT(a1.alph), SUM(a1.num2); ");

        String input2[] = {
                "DEF\t2\ta\t3",
                "DEF\t2\td\t5",
        };

        Util.createInputFile(cluster, "testGroupElements2.txt", input2);
        pigServer.registerQuery("a2 = load 'testGroupElements2.txt' " +
                "as (str:chararray, num1:int, alph : chararray, num2 : int);");
        pigServer.registerQuery("b2 = group a2 by str;");

        // check if combiner is present or not for various forms of foreach
        pigServer.registerQuery("c2 = foreach b2  generate flatten(group), COUNT(a2.alph), SUM(a2.num2); ");

        pigServer.registerQuery("c = union c1, c2;");

        List<Tuple> expectedRes =
                Util.getTuplesFromConstantTupleStrings(
                        new String[]{
                                "('ABC',4L,10L)",
                                "('DEF',2L,8L)",
                        });

        Iterator<Tuple> it = pigServer.openIterator("c");
        Util.checkQueryOutputsAfterSort(it, expectedRes);

        Util.deleteFile(cluster, "testGroupElements1.txt");
        Util.deleteFile(cluster, "testGroupElements2.txt");
        pigServer.shutdown();
    }

    @Test
    public void testGroupElements() throws Exception {
        // test use of combiner when group elements are accessed in the foreach
        String input[] = {
                        "ABC\t1\ta\t1",
                        "ABC\t1\tb\t2",
                        "ABC\t1\ta\t3",
                        "ABC\t2\tb\t4",
                        "DEF\t1\td\t1",
                        "XYZ\t1\tx\t2"
        };

        Util.createInputFile(cluster, "testGroupElements.txt", input);
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
        pigServer.registerQuery("b = group a by (str, num1);");

        // check if combiner is present or not for various forms of foreach
        pigServer.registerQuery("c = foreach b  generate flatten(group), COUNT(a.alph), SUM(a.num2); ");
        checkCombinerUsed(pigServer, "c", true);

        pigServer.registerQuery("c = foreach b  generate group, COUNT(a.alph), SUM(a.num2); ");
        checkCombinerUsed(pigServer, "c", true);

        // projecting bag - combiner should not be used
        pigServer.registerQuery("c = foreach b  generate group, a,  COUNT(a.alph), SUM(a.num2); ");
        checkCombinerUsed(pigServer, "c", false);

        // projecting bag - combiner should not be used
        pigServer.registerQuery("c = foreach b  generate group, a.num2,  COUNT(a.alph), SUM(a.num2); ");
        checkCombinerUsed(pigServer, "c", false);

        pigServer.registerQuery("c = foreach b  generate group.$0, group.$1, COUNT(a.alph), SUM(a.num2); ");
        checkCombinerUsed(pigServer, "c", true);

        pigServer.registerQuery("c = foreach b  generate group.$0, group.$1 + COUNT(a.alph), SUM(a.num2); ");
        checkCombinerUsed(pigServer, "c", true);

        pigServer.registerQuery("c = foreach b  generate group.str, group.$1, COUNT(a.alph), SUM(a.num2); ");
        checkCombinerUsed(pigServer, "c", true);

        pigServer.registerQuery("c = foreach b  generate group.str, group.$1, COUNT(a.alph), SUM(a.num2), "
                        + " (group.num1 == 1 ? (COUNT(a.num2) + 1)  : (SUM(a.num2) + 10)) ; ");
        checkCombinerUsed(pigServer, "c", true);

        List<Tuple> expectedRes =
                Util.getTuplesFromConstantTupleStrings(
                        new String[] {
                                        "('ABC',1,3L,6L,4L)",
                                        "('ABC',2,1L,4L,14L)",
                                        "('DEF',1,1L,1L,2L)",
                                        "('XYZ',1,1L,2L,2L)",
                        });

        Iterator<Tuple> it = pigServer.openIterator("c");
        Util.checkQueryOutputsAfterSort(it, expectedRes);

        Util.deleteFile(cluster, "distinctAggs1Input.txt");
        pigServer.shutdown();
    }

    @Test
    public void testGroupByLimit() throws Exception {
        Assume.assumeFalse("Skip this test for Tez till PIG-5249 is fixed",
                Util.isTezExecType(cluster.getExecType()));
        // test use of combiner when group elements are accessed in the foreach
        String input[] = {
                        "ABC 1",
                        "ABC 2",
                        "DEF 1",
                        "XYZ 1",
                        "XYZ 2",
                        "XYZ 3",
        };

        Util.createInputFile(cluster, "testGroupLimit.txt", input);
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        pigServer.registerQuery("a = load 'testGroupLimit.txt'  using PigStorage(' ') " +
                "as (str:chararray, num1:int) ;");
        pigServer.registerQuery("b = group a by str;");

        pigServer.registerQuery("c = foreach b  generate group, COUNT(a.num1) ; ");

        // check if combiner is present
        pigServer.registerQuery("d = limit c 2 ; ");
        checkCombinerUsed(pigServer, "d", true);

        List<Tuple> expectedRes =
                Util.getTuplesFromConstantTupleStrings(
                        new String[] {
                                        "('ABC',2L)",
                                        "('DEF',1L)",
                        });

        Iterator<Tuple> it = pigServer.openIterator("d");
        Util.checkQueryOutputsAfterSort(it, expectedRes);
        pigServer.shutdown();
    }

    private void checkCombinerUsed(PigServer pigServer, String alias, boolean combineExpected)
            throws IOException {
        // make sure there is a combine plan in the explain output
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        pigServer.explain(alias, ps);
        boolean combinerFound;
        if (pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) {
            combinerFound = baos.toString().contains("Reduce By");
        } else {
            combinerFound = baos.toString().matches("(?si).*combine plan.*");
        }

        System.out.println(baos.toString());
        assertEquals("is combiner present as expected", combineExpected, combinerFound);
    }

    @Test
    public void testDistinctNoCombiner() throws Exception {
        // test that combiner is NOT invoked when
        // one of the elements in the foreach generate
        // is a distinct() as the leaf
        String input[] = {
                        "pig1\t18\t2.1",
                        "pig2\t24\t3.3",
                        "pig5\t45\t2.4",
                        "pig1\t18\t2.1",
                        "pig1\t19\t2.1",
                        "pig2\t24\t4.5",
                        "pig1\t20\t3.1" };

        Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
        pigServer.registerQuery("b = group a by name;");
        pigServer.registerQuery("c = foreach b  {" +
                "        z = distinct a;" +
                "        generate group, z, SUM(a.age), SUM(a.gpa);};");

        // make sure there is a combine plan in the explain output
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        pigServer.explain("c", ps);
        assertFalse(baos.toString().matches("(?si).*combine plan.*"));

        HashMap<String, Object[]> results = new HashMap<String, Object[]>();
        results.put("pig1", new Object[] { "pig1", "bag-place-holder", 75L, 9.4 });
        results.put("pig2", new Object[] { "pig2", "bag-place-holder", 48L, 7.8 });
        results.put("pig5", new Object[] { "pig5", "bag-place-holder", 45L, 2.4 });
        Iterator<Tuple> it = pigServer.openIterator("c");
        while (it.hasNext()) {
            Tuple t = it.next();
            List<Object> fields = t.getAll();
            Object[] expected = results.get(fields.get(0));
            int i = 0;
            for (Object field : fields) {
                if (i == 1) {
                    // ignore the second field which is a bag
                    // for comparison here
                    continue;
                }
                assertEquals(expected[i++], field);
            }
        }
        Util.deleteFile(cluster, "distinctNoCombinerInput.txt");
        pigServer.shutdown();
    }

    @Test
    public void testForEachNoCombiner() throws Exception {
        // test that combiner is NOT invoked when
        // one of the elements in the foreach generate
        // has a foreach in the plan without a distinct agg
        String input[] = {
                        "pig1\t18\t2.1",
                        "pig2\t24\t3.3",
                        "pig5\t45\t2.4",
                        "pig1\t18\t2.1",
                        "pig1\t19\t2.1",
                        "pig2\t24\t4.5",
                        "pig1\t20\t3.1" };

        Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
        pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
        pigServer.registerQuery("b = group a by name;");
        pigServer.registerQuery("c = foreach b  {" +
                "        z = a.age;" +
                "        generate group, z, SUM(a.age), SUM(a.gpa);};");

        // make sure there is a combine plan in the explain output
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        pigServer.explain("c", ps);
        assertFalse(baos.toString().matches("(?si).*combine plan.*"));

        HashMap<String, Object[]> results = new HashMap<String, Object[]>();
        results.put("pig1", new Object[] { "pig1", "bag-place-holder", 75L, 9.4 });
        results.put("pig2", new Object[] { "pig2", "bag-place-holder", 48L, 7.8 });
        results.put("pig5", new Object[] { "pig5", "bag-place-holder", 45L, 2.4 });
        Iterator<Tuple> it = pigServer.openIterator("c");
        while (it.hasNext()) {
            Tuple t = it.next();
            List<Object> fields = t.getAll();
            Object[] expected = results.get(fields.get(0));
            int i = 0;
            for (Object field : fields) {
                if (i == 1) {
                    // ignore the second field which is a bag
                    // for comparison here
                    continue;
                }
                assertEquals(expected[i++], field);
            }
        }
        Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
        pigServer.shutdown();
    }

    @Test
    public void testJiraPig746() throws Exception {
        // test that combiner is NOT invoked when
        // one of the elements in the foreach generate
        // has a foreach in the plan without a distinct agg
        String input[] = {
                        "pig1\t18\t2.1",
                        "pig2\t24\t3.3",
                        "pig5\t45\t2.4",
                        "pig1\t18\t2.1",
                        "pig1\t19\t2.1",
                        "pig2\t24\t4.5",
                        "pig1\t20\t3.1" };

        String expected[] = {
                        "(pig1,75,{(pig1,18,2.1),(pig1,18,2.1),(pig1,19,2.1),(pig1,20,3.1)})",
                        "(pig2,48,{(pig2,24,3.3),(pig2,24,4.5)})",
                        "(pig5,45,{(pig5,45,2.4)})"
        };

        try {
            Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);

            PigServer pigServer = new PigServer(cluster.getExecType(), properties);
            pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
            pigServer.registerQuery("b = group a by name;");
            pigServer.registerQuery("c = foreach b generate group, SUM(a.age), a;");

            // make sure there isn't a combine plan in the explain output
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            pigServer.explain("c", ps);
            assertFalse(baos.toString().matches("(?si).*combine plan.*"));

            Iterator<Tuple> it = pigServer.openIterator("c");
            Util.checkQueryOutputsAfterSortRecursive(it, expected,
                    "group:chararray,age:long,b:{t:(name:chararray,age:int,gpa:double)}");
            pigServer.shutdown();
        } finally {
            Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
        }
    }

    public static class JiraPig1030 extends EvalFunc<DataBag> {

        @Override
        public DataBag exec(Tuple input) throws IOException {
            return new DefaultDataBag();
        }
    }

    @Test
    public void testJiraPig1030() throws Exception {
        // test that combiner is NOT invoked when
        // one of the elements in the foreach generate
        // has a non-algebraic UDF that have multiple inputs
        // (one of them is distinct).

        String input[] = {
                        "pig1\t18\t2.1",
                        "pig2\t24\t3.3",
                        "pig5\t45\t2.4",
                        "pig1\t18\t2.1",
                        "pig1\t19\t2.1",
                        "pig2\t24\t4.5",
                        "pig1\t20\t3.1" };

        try {
            Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
            PigServer pigServer = new PigServer(cluster.getExecType(), properties);
            pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
            pigServer.registerQuery("b = group a all;");
            pigServer.registerQuery("c = foreach b  {" +
                    "        d = distinct a.age;" +
                    "        generate group, " + JiraPig1030.class.getName() + "(d, 0);};");

            // make sure there isn't a combine plan in the explain output
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            pigServer.explain("c", ps);
            assertFalse(baos.toString().matches("(?si).*combine plan.*"));
            pigServer.shutdown();
        } finally {
            Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
        }
    }
}
