/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.test;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigRunner;
import org.apache.pig.PigRunner.ReturnCode;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.newplan.Operator;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestPigRunner {

    private static MiniGenericCluster cluster;
    private static String execType;

    private static final String INPUT_FILE = "input";
    private static final String OUTPUT_FILE = "output";
    private static final String PIG_FILE = "test.pig";

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        cluster = MiniGenericCluster.buildCluster();
        execType = cluster.getExecType().name().toLowerCase();
        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
        w.println("1\t2\t3");
        w.println("5\t3\t4");
        w.println("3\t4\t5");
        w.println("5\t6\t7");
        w.println("3\t7\t8");
        w.close();
        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        new File(INPUT_FILE).delete();
        cluster.shutDown();
    }

    @Before
    public void setUp() {
        deleteAll(new File(OUTPUT_FILE));
        Util.resetStateForExecModeSwitch();
    }

    @Test
    public void testErrorLogFile() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = foreach A generate StringSize(a0);");
        w.println("store B into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-x", "local", PIG_FILE };
            PigStats stats = PigRunner.run(args, null);

            assertTrue(!stats.isSuccessful());

            Properties props = stats.getPigProperties();
            String logfile = props.getProperty("pig.logfile");
            File f = new File(logfile);
            assertTrue(f.exists());
        } finally {
            new File(PIG_FILE).delete();
        }
    }

    @Test
    public void testErrorLogFile2() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = foreach A generate StringSize(a0);");
        w.println("store B into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-M", "-x", "local", PIG_FILE };
            PigStats stats = PigRunner.run(args, null);

            assertTrue(!stats.isSuccessful());

            Properties props = stats.getPigProperties();
            // If test on nfs, the pig script complaining "output" exists
            // and does not actually launch the job. This could due to a mapreduce
            // bug which removing file before closing it.
            // If this happens, props is null because we only set pigContext before
            // launching job.
            if (props!=null) {
                String logfile = props.getProperty("pig.logfile");
                File f = new File(logfile);
                assertTrue(f.exists());
            }
        } finally {
            new File(PIG_FILE).delete();
        }
    }

    @Test
    public void simpleTest() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = group A by a0;");
        w.println("C = foreach B generate group, COUNT(A);");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            String name = stats.getOutputNames().get(0);
            assertEquals(OUTPUT_FILE, name);
            assertEquals(12, stats.getBytesWritten());
            assertEquals(3, stats.getRecordWritten());

            assertEquals("A,B,C",
                    ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());

            Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
            assertTrue(conf.getBoolean("stop.on.failure", false));
            assertTrue(!conf.getBoolean("aggregate.warning", true));
            assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true));
            assertTrue(!conf.getBoolean("opt.fetch", true));
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void simpleTest2() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = filter A by a0 == 3;");
        w.println("C = limit B 1;");
        w.println("dump C;");
        w.close();

        try {
            String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());
            if (execType.toString().startsWith("tez")) {
                assertEquals(1, stats.getNumberJobs());
                assertEquals(stats.getJobGraph().size(), 1);
            } else {
                assertEquals(2, stats.getNumberJobs());
                assertEquals(stats.getJobGraph().size(), 2);
            }

            Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties());
            assertTrue(conf.getBoolean("stop.on.failure", false));
            assertTrue(!conf.getBoolean("aggregate.warning", true));
            assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true));
            assertTrue(conf.getBoolean("opt.fetch", true));
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void scriptsInDfsTest() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = group A by a0;");
        w.println("C = foreach B generate group, COUNT(A);");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.close();
        Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE);
        Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);

        try {
            String[] args = { "-x", execType, inputInDfs.toString() };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertTrue(stats.getJobGraph().size() == 1);
            String name = stats.getOutputNames().get(0);
            assertEquals(OUTPUT_FILE, name);
            assertEquals(12, stats.getBytesWritten());
            assertEquals(3, stats.getRecordWritten());

            assertEquals("A,B,C",
                    ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, PIG_FILE);
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void orderByTest() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = order A by a0;");
        w.println("C = limit B 2;");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.close();
        String[] args = { "-x", execType, PIG_FILE };
        try {
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
            assertTrue(stats.isSuccessful());
            if (execType.equals("tez")) {
                assertEquals(stats.getJobGraph().size(), 1);
                // 5 vertices
                assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
            } else {
                assertEquals(stats.getJobGraph().size(), 4);
            }
            assertTrue(stats.getJobGraph().getSinks().size() == 1);
            assertTrue(stats.getJobGraph().getSources().size() == 1);
            JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
            assertEquals(OUTPUT_FILE, js.getOutputs().get(0).getName());
            assertEquals(2, js.getOutputs().get(0).getNumberRecords());
            assertEquals(12, js.getOutputs().get(0).getBytes());
            assertEquals(OUTPUT_FILE, stats.getOutputNames().get(0));
            assertEquals(2, stats.getRecordWritten());
            assertEquals(12, stats.getBytesWritten());

            if (execType.equals("tez")) {
                assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get(
                        0)).getAlias());
                // TODO: alias is not set for sample-aggregation/partition/sort job.
                //       Need to investigate
                // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
                //        js).get(0)).getAlias());
            } else {
                assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
                        0)).getAlias());
                assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
                        js).get(0)).getAlias());
                assertEquals("B", js.getAlias());
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void simpleMultiQueryTest() throws Exception {
        final String OUTPUT_FILE_2 = "output2";

        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = filter A by a0 >= 4;");
        w.println("C = filter A by a0 < 4;");
        w.println("store B into '" + OUTPUT_FILE_2 + "';");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
            assertTrue(stats.isSuccessful());
            assertTrue(stats.getJobGraph().size() == 1);
            // Each output file should include the following:
            // output:
            //   1\t2\t3\n
            //   3\t4\t5\n
            //   3\t7\t8\n
            // output2:
            //   5\t3\t4\n
            //   5\t6\t7\n
            final int numOfRecords = 5;
            final int numOfCharsPerRecord = 6;
            assertEquals(numOfRecords, stats.getRecordWritten());
            assertEquals(numOfRecords * numOfCharsPerRecord, stats.getBytesWritten());
            assertTrue(stats.getOutputNames().size() == 2);
            for (String fname : stats.getOutputNames()) {
                assertTrue(fname.equals(OUTPUT_FILE) || fname.equals(OUTPUT_FILE_2));
                if (fname.equals(OUTPUT_FILE)) {
                    assertEquals(3, stats.getNumberRecords(fname));
                } else {
                    assertEquals(2, stats.getNumberRecords(fname));
                }
            }
            assertEquals("A,B,C",
                    ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, OUTPUT_FILE_2);
        }
    }

    @Test
    public void simpleMultiQueryTest2() throws Exception {
        final String OUTPUT_FILE_2 = "output2";

        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = filter A by a0 >= 4;");
        w.println("C = filter A by a0 < 4;");
        w.println("D = group C by a0;");
        w.println("E = foreach D generate group, COUNT(C);");
        w.println("store B into '" + OUTPUT_FILE_2 + "';");
        w.println("store E into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
            assertTrue(stats.isSuccessful());
            assertEquals(stats.getJobGraph().size(), 1);

            // Each output file should include the following:
            // output:
            //   5\t3\t4\n
            //   5\t6\t7\n
            // output2:
            //   1\t1\n
            //   3\t2\n
            final int numOfRecords1 = 2;
            final int numOfRecords2 = 2;
            final int numOfCharsPerRecord1 = 6;
            final int numOfCharsPerRecord2 = 4;
            assertEquals(numOfRecords1 + numOfRecords2, stats.getRecordWritten());
            assertEquals((numOfRecords1 * numOfCharsPerRecord1) + (numOfRecords2 * numOfCharsPerRecord2),
                    stats.getBytesWritten());
            assertTrue(stats.getOutputNames().size() == 2);
            for (String fname : stats.getOutputNames()) {
                assertTrue(fname.equals(OUTPUT_FILE) || fname.equals(OUTPUT_FILE_2));
                if (fname.equals(OUTPUT_FILE)) {
                    assertEquals(2, stats.getNumberRecords(fname));
                } else {
                    assertEquals(2, stats.getNumberRecords(fname));
                }
            }
            assertEquals("A,B,C,D,E",
                    ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, OUTPUT_FILE_2);
        }
    }

    @Test
    public void simpleMultiQueryTest3() throws Exception {
        final String INPUT_FILE_2 = "input2";
        final String OUTPUT_FILE_2 = "output2";

        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_2));
        w.println("3\t4\t5");
        w.println("5\t6\t7");
        w.println("3\t7\t8");
        w.close();
        Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2);
        new File(INPUT_FILE_2).delete();

        w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("A1 = load '" + INPUT_FILE_2 + "' as (a0:int, a1:int, a2:int);");
        w.println("B = filter A by a0 == 3;");
        w.println("C = filter A by a1 <=5;");
        w.println("D = join C by a0, B by a0, A1 by a0 using 'replicated';");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.println("store D into '" + OUTPUT_FILE_2 + "';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
            assertTrue(stats.isSuccessful());
            if (Util.isMapredExecType(cluster.getExecType())) {
                assertEquals(3, stats.getJobGraph().size());
            } else {
                assertEquals(1, stats.getJobGraph().size());
            }

            // Each output file should include the following:
            // output:
            //   1\t2\t3\n
            //   5\t3\t4\n
            //   3\t4\t5\n
            // output2:
            //   3\t4\t5\t3\t4\t5\t3\t4\t5\n
            //   3\t4\t5\t3\t4\t5\t3\t7\t8\n
            //   3\t4\t5\t3\t7\t8\t3\t4\t5\n
            //   3\t4\t5\t3\t4\t5\t3\t7\t8\n
            final int numOfRecords1 = 3;
            final int numOfRecords2 = 4;
            final int numOfBytesWritten1 = 18;
            final int numOfBytesWritten2 = 72;

            assertEquals(numOfRecords1 + numOfRecords2, stats.getRecordWritten());
            assertEquals(numOfBytesWritten1 + numOfBytesWritten2, stats.getBytesWritten());

            List<String> outputNames = new ArrayList<String>(stats.getOutputNames());
            assertTrue(outputNames.size() == 2);
            Collections.sort(outputNames);
            assertEquals(OUTPUT_FILE, outputNames.get(0));
            assertEquals(OUTPUT_FILE_2, outputNames.get(1));
            assertEquals(3, stats.getNumberRecords(OUTPUT_FILE));
            assertEquals(4, stats.getNumberRecords(OUTPUT_FILE_2));

            List<InputStats> inputStats = new ArrayList<InputStats>(stats.getInputStats());
            assertTrue(inputStats.size() == 2);
            Collections.sort(inputStats, new Comparator<InputStats>() {
                @Override
                public int compare(InputStats o1, InputStats o2) {
                    return o1.getLocation().compareTo(o2.getLocation());
                }
            });
            assertEquals(5, inputStats.get(0).getNumberRecords());
            assertEquals(3, inputStats.get(1).getNumberRecords());
            // For mapreduce, since hdfs bytes read includes replicated tables bytes read is wrong
            // Since Tez does has only one load per job its values are correct
            if (!Util.isMapredExecType(cluster.getExecType())) {
                assertEquals(30, inputStats.get(0).getBytes());
                assertEquals(18, inputStats.get(1).getBytes());
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, INPUT_FILE_2);
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, OUTPUT_FILE_2);
        }
    }

    @Test
    public void MQDepJobFailedTest() throws Exception {
        final String OUTPUT_FILE_2 = "output2";
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (name:chararray, a1:int, a2:int);");
        w.println("store A into '" + OUTPUT_FILE_2 + "';");
        w.println("B = FOREACH A GENERATE org.apache.pig.test.utils.UPPER(name);");
        w.println("C= order B by $0;");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.close();
        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, null);
            Iterator<JobStats> iter = stats.getJobGraph().iterator();
            while (iter.hasNext()) {
                 JobStats js=iter.next();
                 if (execType.equals("tez")) {
                     assertEquals(js.getState().name(), "FAILED");
                 } else {
                     if(js.getState().name().equals("FAILED")) {
                         List<Operator> ops=stats.getJobGraph().getSuccessors(js);
                         for(Operator op : ops ) {
                             assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
                         }
                     }
                 }
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, OUTPUT_FILE_2);
        }
    }

    @Test
    public void simpleNegativeTest() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = group A by a;");
        w.println("C = foreach B generate group, COUNT(A);");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.close();
        String[] args = { "-x", execType, "-c", PIG_FILE };
        PigStats stats = PigRunner.run(args, null);
        assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
        // TODO: error message has changed. Need to catch the new message generated from the
        // new parser.
//        assertTrue(stats.getErrorCode() == 1000);
//        assertEquals("Error during parsing. Invalid alias: a in {a0: int,a1: int,a2: int}",
//                stats.getErrorMessage());
    }

    @Test
    public void simpleNegativeTest2() throws Exception {
        String[] args = { "-x", execType, "-c", "-e", "this is a test" };
        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
        assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
    }

    @Test
    public void simpleNegativeTest3() throws Exception {
        String[] args = { "-x", execType, "-c", "-y" };
        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
        assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
        assertEquals("Found unknown option (-y) at position 4",
                stats.getErrorMessage());
    }

    @Test
    public void streamNegativeTest() throws Exception {
        Assume.assumeTrue("Skip this test for TEZ temporarily as it hangs", Util.isMapredExecType(cluster.getExecType()));
        final String OUTPUT_FILE_2 = "output2";
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = foreach A generate 1;");
        w.println("C = foreach A generate 0/0;");
        w.println("store B into '" + OUTPUT_FILE + "';");
        w.println("store C into '" + OUTPUT_FILE_2 + "';");
        w.println("D = load '" + OUTPUT_FILE_2 + "';");
        w.println("E = stream D through `false`;");
        w.println("store E into 'ee';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, null);
            assertTrue(!stats.isSuccessful());
            if (execType.equals("tez")) {
                assertTrue(stats.getReturnCode() == ReturnCode.FAILURE);
                assertTrue(stats.getJobGraph().size() == 1);
                JobStats job = (JobStats)stats.getJobGraph().getSinks().get(0);
                assertTrue(!job.isSuccessful());
                assertTrue(stats.getOutputStats().size() == 3);
                for (OutputStats output : stats.getOutputStats()) {
                    assertTrue(!output.isSuccessful());
                }
            } else {
                assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
                assertTrue(stats.getJobGraph().size() == 2);
                JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
                assertTrue(job.isSuccessful());
                job = (JobStats)stats.getJobGraph().getSinks().get(0);
                assertTrue(!job.isSuccessful());
                assertTrue(stats.getOutputStats().size() == 3);
                for (OutputStats output : stats.getOutputStats()) {
                    if (output.getName().equals("ee")) {
                        assertTrue(!output.isSuccessful());
                    } else {
                        assertTrue(output.isSuccessful());
                    }
                }
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, OUTPUT_FILE_2);
        }
    }

    @Test
    public void testIsTempFile() throws Exception {
        PigContext context = new PigContext(Util.getLocalTestMode(), new Properties());
        context.connect();
        for (int i=0; i<100; i++) {
            String file = FileLocalizer.getTemporaryPath(context).toString();
            assertTrue("not a temp file: " + file, PigStatsUtil.isTempFile(file));
        }
    }

    @Test
    public void testCounterName() throws Exception {
        String s = "jdbc:hsqldb:file:/tmp/batchtest;hsqldb.default_table_type=cached;hsqldb.cache_rows=100";
        String name = MRPigStatsUtil.getMultiInputsCounterName(s, 0);
        assertEquals(MRPigStatsUtil.MULTI_INPUTS_RECORD_COUNTER + "_0_batchtest", name);
        s = "file:///tmp/batchtest{1,2}.txt";
        name = MRPigStatsUtil.getMultiInputsCounterName(s, 1);
        assertEquals(MRPigStatsUtil.MULTI_INPUTS_RECORD_COUNTER + "_1_batchtest{1,2}.txt", name);
        s = "file:///tmp/batchtest*.txt";
        name = MRPigStatsUtil.getMultiInputsCounterName(s, 2);
        assertEquals(MRPigStatsUtil.MULTI_INPUTS_RECORD_COUNTER + "_2_batchtest*.txt", name);
    }

    @Test
    public void testLongCounterName() throws Exception {
        // Pig now restricts the string size of its counter name to less than 64 characters.
        PrintWriter w = new PrintWriter(new FileWriter("myinputfile"));
        w.println("1\t2\t3");
        w.println("5\t3\t4");
        w.println("3\t4\t5");
        w.println("5\t6\t7");
        w.println("3\t7\t8");
        w.close();
        String longfilename = "longlonglonglonglonglonglonglonglonglonglonglongfilefilefilename";
        Util.copyFromLocalToCluster(cluster, "myinputfile", longfilename);

        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("B = load '" + longfilename + "' as (a0:int, a1:int, a2:int);");
        w1.println("C = join A by a0, B by a0;");
        w1.println("store C into '" + OUTPUT_FILE + "';");
        w1.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            List<InputStats> inputs = stats.getInputStats();
            assertEquals(2, inputs.size());
            for (InputStats instats : inputs) {
                assertEquals(5, instats.getNumberRecords());
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void testDuplicateCounterName() throws Exception {
        // Pig now restricts the string size of its counter name to less than 64 characters.
        PrintWriter w = new PrintWriter(new FileWriter("myinputfile"));
        w.println("1\t2\t3");
        w.println("5\t3\t4");
        w.close();
        String samefilename = "tmp/input";
        Util.copyFromLocalToCluster(cluster, "myinputfile", samefilename);

        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("B = load '" + samefilename + "' as (a0:int, a1:int, a2:int);");
        w1.println("C = join A by a0, B by a0;");
        w1.println("store C into '" + OUTPUT_FILE + "';");
        w1.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            List<InputStats> inputs = stats.getInputStats();
            assertEquals(2, inputs.size());
            for (InputStats instats : inputs) {
                if (instats.getLocation().endsWith("tmp/input")) {
                    assertEquals(2, instats.getNumberRecords());
                } else {
                    assertEquals(5, instats.getNumberRecords());
                }
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void testDuplicateCounterName2() throws Exception {

        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("B = filter A by a0 > 3;");
        w1.println("store A into 'output';");
        w1.println("store B into 'tmp/output';");
        w1.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            List<OutputStats> outputs = stats.getOutputStats();
            assertEquals(2, outputs.size());
            for (OutputStats outstats : outputs) {
                if (outstats.getLocation().endsWith("tmp/output")) {
                    assertEquals(2, outstats.getNumberRecords());
                } else {
                    assertEquals(5, outstats.getNumberRecords());
                }
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, "tmp/output");
        }
    }

    @Test
    public void testRegisterExternalJar() throws Exception {
        String jarName = Util.findPigJarName();

        String[] args = { "-Dpig.additional.jars=" + jarName,
                "-Dmapred.job.queue.name=default", "-x", execType,
                "-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" };
        PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

        Util.deleteFile(cluster, OUTPUT_FILE);
        PigContext ctx = stats.getPigContext();

        assertNotNull(ctx);

        assertTrue(ctx.extraJars.contains(ClassLoader.getSystemResource(jarName)));
        assertTrue("default", ctx.getProperties().getProperty(MRConfiguration.JOB_QUEUE_NAME) != null
                && ctx.getProperties().getProperty(MRConfiguration.JOB_QUEUE_NAME).equals("default")
                || ctx.getProperties().getProperty("mapreduce.job.queuename") != null
                && ctx.getProperties().getProperty("mapreduce.job.queuename").equals("default"));

    }

    @Test
    public void classLoaderTest() throws Exception {
        // Skip in hadoop 23 test, see PIG-2449
        if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
            return;
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
        w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();");
        w.println("store A into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
            assertTrue(stats.isSuccessful());
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void fsCommandTest() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("fs -mv nonexist.file dummy.file");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(!stats.isSuccessful());
            assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.IO_EXCEPTION);
        } finally {
            new File(PIG_FILE).delete();
        }
    }

    @Test // PIG-2006
    public void testEmptyFile() throws IOException {
        File f1 = new File(PIG_FILE);

        FileWriter fw1 = new FileWriter(f1);
        fw1.close();

        try {
           String[] args = { "-x", "local", "-c", PIG_FILE };
           PigStats stats = PigRunner.run(args, null);

           assertTrue(stats.isSuccessful());
           assertEquals( 0, stats.getReturnCode() );
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void returnCodeTest() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load 'non-existine.file' as (a0:int, a1:int, a2:int);");
        w.println("B = filter A by a0 > 0;;");
        w.println("C = group B by $0;");
        w.println("D = join C by $0, B by $0;");
        w.println("store D into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, null);

            assertTrue(!stats.isSuccessful());
            assertTrue(stats.getReturnCode() != 0);
            assertTrue(stats.getOutputStats().size() == 0);

        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test
    public void returnCodeTest2() throws Exception {
        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load 'non-existine.file' as (a0, a1);");
        w.println("B = load 'data' as (b0, b1);");
        w.println("C = join B by b0, A by a0 using 'repl';");
        w.println("store C into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, null);

            assertTrue(!stats.isSuccessful());
            assertTrue(stats.getReturnCode() != 0);
            assertTrue(stats.getOutputStats().size() == 0);

        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }


    @Test //PIG-1893
    public void testEmptyFileCounter() throws Exception {

        PrintWriter w = new PrintWriter(new FileWriter("myinputfile"));
        w.close();

        Util.copyFromLocalToCluster(cluster, "myinputfile", "1.txt");

        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("B = load '1.txt' as (a0:int, a1:int, a2:int);");
        w1.println("C = join A by a0, B by a0;");
        w1.println("store C into '" + OUTPUT_FILE + "';");
        w1.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            List<InputStats> inputs = stats.getInputStats();
            assertEquals(2, inputs.size());
            for (InputStats instats : inputs) {
                if (instats.getLocation().endsWith("1.txt")) {
                    assertEquals(0, instats.getNumberRecords());
                } else {
                    assertEquals(5, instats.getNumberRecords());
                }
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test //PIG-1893
    public void testEmptyFileCounter2() throws Exception {

        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("B = filter A by a0 < 0;");
        w1.println("store A into '" + OUTPUT_FILE + "';");
        w1.println("store B into 'output2';");
        w1.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            List<OutputStats> outputs = stats.getOutputStats();
            assertEquals(2, outputs.size());
            for (OutputStats outstats : outputs) {
                if (outstats.getLocation().endsWith("output2")) {
                    assertEquals(0, outstats.getNumberRecords());
                } else {
                    assertEquals(5, outstats.getNumberRecords());
                }
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, "output2");
        }
    }

    @Test // PIG-2208: Restrict number of PIG generated Haddop counters
    public void testDisablePigCounters() throws Exception {
        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("B = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("C = join A by a0, B by a0;");
        w1.println("store C into '" + OUTPUT_FILE + "';");
        w1.close();

        try {
            String[] args = {"-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            List<InputStats> inputs = stats.getInputStats();
            assertEquals(2, inputs.size());
            if (execType.equals("tez")) {
                assertEquals(5, inputs.get(0).getNumberRecords());
                assertEquals(5, inputs.get(1).getNumberRecords());
            } else {
                for (InputStats instats : inputs) {
                    // the multi-input counters are disabled
                    assertEquals(-1, instats.getNumberRecords());
                }
            }

            List<OutputStats> outputs = stats.getOutputStats();
            assertEquals(1, outputs.size());
            OutputStats outstats = outputs.get(0);
            assertEquals(9, outstats.getNumberRecords());
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
        }
    }

    @Test             //Pig-2358
    public void testGetHadoopCounters() throws Exception {
        final String OUTPUT_FILE_2 = "output2";

        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w.println("B = filter A by a0 >= 4;");
        w.println("C = filter A by a0 < 4;");
        w.println("D = group C by a0;");
        w.println("E = foreach D generate group, COUNT(C);");
        w.println("store B into '" + OUTPUT_FILE_2 + "';");
        w.println("store E into '" + OUTPUT_FILE + "';");
        w.close();

        try {
            String[] args = { "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            String TASK_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.TaskCounter" : MRPigStatsUtil.TASK_COUNTER_GROUP;
            String FS_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.FileSystemCounter" : MRPigStatsUtil.FS_COUNTER_GROUP;

            if (execType.equals("tez")) {
                Counters counter= ((JobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
                assertEquals(5, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
                        "INPUT_RECORDS_PROCESSED").getValue());
                assertEquals(2, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
                assertEquals(7, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName(
                        "OUTPUT_RECORDS").getValue());
                assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
                assertEquals(new File(INPUT_FILE).length(),counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.HDFS_BYTES_READ).getValue());
            } else {
                Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
                assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.MAP_INPUT_RECORDS).getValue());
                assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue());
                assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue());
                assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue());
                assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());

                // Skip for hadoop 20.203+, See PIG-2446
                if (Util.isHadoop203plus())
                    return;

                assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName(
                        MRPigStatsUtil.HDFS_BYTES_READ).getValue());
            }
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, OUTPUT_FILE_2);
        }
    }

    @Test // PIG-2208: Restrict number of PIG generated Haddop counters
    public void testDisablePigCounters2() throws Exception {

        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
        w1.println("B = filter A by a0 > 3;");
        w1.println("store A into 'output';");
        w1.println("store B into 'tmp/output';");
        w1.close();

        try {
            String[] args = { "-Dpig.disable.counter=true", "-x", execType, PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(stats.isSuccessful());

            assertEquals(1, stats.getNumberJobs());
            List<OutputStats> outputs = stats.getOutputStats();
            assertEquals(2, outputs.size());
            if (execType.equals("tez")) {
                assertEquals(outputs.get(0).getNumberRecords(), 5);
                assertEquals(outputs.get(1).getNumberRecords(), 2);
            } else {
                for (OutputStats outstats : outputs) {
                    // the multi-output counters are disabled
                    assertEquals(-1, outstats.getNumberRecords());
                }
            }

            List<InputStats> inputs = stats.getInputStats();
            assertEquals(1, inputs.size());
            InputStats instats = inputs.get(0);
            assertEquals(5, instats.getNumberRecords());
        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, "tmp/output");
        }
    }

    /**
     * PIG-2780: In this test case, Pig submits three jobs at the same time and
     * one of them will fail due to nonexistent input file. If users enable
     * stop.on.failure, then Pig should immediately stop if anyone of the three
     * jobs has failed.
     */
    @Test
    public void testStopOnFailure() throws Exception {

        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
        w1.println("A1 = load '" + INPUT_FILE + "';");
        w1.println("B1 = load 'nonexist';");
        w1.println("C1 = load '" + INPUT_FILE + "';");
        w1.println("A2 = distinct A1;");
        w1.println("B2 = distinct B1;");
        w1.println("C2 = distinct C1;");
        w1.println("ret = union A2,B2,C2;");
        w1.println("store ret into 'tmp/output';");
        w1.close();

        try {
            String[] args = { "-x", execType, "-F", PIG_FILE };
            PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));

            assertTrue(!stats.isSuccessful());

            int successfulJobs = 0;
            Iterator<Operator> it = stats.getJobGraph().getOperators();
            while (it.hasNext()){
                JobStats js = (JobStats)it.next();
                if (js.isSuccessful())
                    successfulJobs++;
            }

            // we should have less than 2 successful jobs
            assertTrue("Should have less than 2 successful jobs", successfulJobs < 2);

        } finally {
            new File(PIG_FILE).delete();
            Util.deleteFile(cluster, OUTPUT_FILE);
            Util.deleteFile(cluster, "tmp/output");
        }
    }

    @Test
    public void testStoredScriptContents() throws Exception {
        String scriptContents = "sh echo success;\n";
        FileUtils.writeStringToFile(new File(PIG_FILE), scriptContents);
        Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE);

        Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
        try {
            runAndValidateStoredScriptContents(PIG_FILE, scriptContents);
            runAndValidateStoredScriptContents(inputInDfs.toString(), scriptContents);
        } finally {
            FileUtils.deleteQuietly(new File(PIG_FILE));
            Util.deleteQuietly(cluster, PIG_FILE);
        }
    }

    @Test
    public void testErrorLogUnderCustomDir() throws Exception {
        try (PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE))) {
            w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
            w.println("B = foreach A generate StringSize(a0);");
            w.println("store B into '" + OUTPUT_FILE + "';");
        }
        Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE);

        Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE);
        try {
            runAndValidateCustomErrorLogDir(PIG_FILE);
            runAndValidateCustomErrorLogDir(inputInDfs.toString());
        } finally {
            FileUtils.deleteQuietly(new File(PIG_FILE));
            Util.deleteQuietly(cluster, PIG_FILE);
        }
    }

    private void runAndValidateStoredScriptContents(String scriptPath, String expectedContents) {
        PigStats stats = runPigLocally(scriptPath);
        assertTrue(stats.isSuccessful());
        assertEquals(expectedContents, stats.getScript());

        stats = runPigLocally("-f", scriptPath);
        assertTrue(stats.isSuccessful());
        assertEquals(expectedContents, stats.getScript());
    }

    private void runAndValidateCustomErrorLogDir(String scriptPath) throws IOException {
        File logsFolder = temporaryFolder.newFolder();
        String logsPath = logsFolder.getAbsolutePath();
        assertFileCountUnderDir(logsFolder, 0);

        PigStats stats = runPigLocally("-l", logsPath, scriptPath);
        assertFalse(stats.isSuccessful());
        assertFileCountUnderDir(logsFolder, 1);

        stats = runPigLocally("-l", logsPath, "-f", scriptPath);
        assertFalse(stats.isSuccessful());
        assertFileCountUnderDir(logsFolder, 2);
    }

    private void assertFileCountUnderDir(File directory, int expectedFileCount) throws IOException {
        String[] files = directory.list();
        assertNotNull(files);
        assertEquals(expectedFileCount, files.length);
    }

    private PigStats runPigLocally(String... extraArgs) {
        String[] args = ArrayUtils.addAll(new String[]{"-x", "local"}, extraArgs);
        return PigRunner.run(args, new TestNotificationListener("local"));
    }

    public static class TestNotificationListener implements PigProgressNotificationListener {

        private Map<String, int[]> numMap = new HashMap<String, int[]>();

        private static final int JobsToLaunch = 0;
        private static final int JobsSubmitted = 1;
        private static final int JobStarted = 2;
        private static final int JobFinished = 3;
        private String execType;

        public TestNotificationListener(String execType) {
            this.execType = execType;
        }

        public TestNotificationListener() {
            this.execType = "mr";
        }

        @Override
        public void initialPlanNotification(String id, OperatorPlan<?> plan) {
            System.out.println("id: " + id + " planNodes: " + plan.getKeys().size());
            assertNotNull(plan);
        }

        @Override
        public void launchStartedNotification(String id, int numJobsToLaunch) {
            System.out.println("id: " + id + " numJobsToLaunch: " + numJobsToLaunch);
            int[] nums = new int[4];
            numMap.put(id, nums);
            nums[JobsToLaunch] = numJobsToLaunch;
        }

        @Override
        public void jobFailedNotification(String id, JobStats jobStats) {
            System.out.println("id: " + id + " job failed: " + jobStats.getJobId());
        }

        @Override
        public void jobFinishedNotification(String id, JobStats jobStats) {
            System.out.println("id: " + id + " job finished: " + jobStats.getJobId());
            int[] nums = numMap.get(id);
            nums[JobFinished]++;
        }

        @Override
        public void jobStartedNotification(String id, String assignedJobId) {
            System.out.println("id: " + id + " job started: " + assignedJobId);
            int[] nums = numMap.get(id);
            nums[JobStarted]++;
        }

        @Override
        public void jobsSubmittedNotification(String id, int numJobsSubmitted) {
            System.out.println("id: " + id + " jobs submitted: " + numJobsSubmitted);
            int[] nums = numMap.get(id);
            nums[JobsSubmitted] += numJobsSubmitted;
        }

        @Override
        public void launchCompletedNotification(String id, int numJobsSucceeded) {
            System.out.println("id: " + id + " numJobsSucceeded: " + numJobsSucceeded);
            System.out.println("");
            int[] nums = numMap.get(id);
            assertEquals(nums[JobsToLaunch], numJobsSucceeded);
            assertEquals(nums[JobsSubmitted], numJobsSucceeded);
            assertEquals(nums[JobStarted], numJobsSucceeded);
            assertEquals(nums[JobFinished], numJobsSucceeded);
        }

        @Override
        public void outputCompletedNotification(String id, OutputStats outputStats) {
            System.out.println("id: " + id + " output done: " + outputStats.getLocation());
        }

        @Override
        public void progressUpdatedNotification(String id, int progress) {
            System.out.println("id: " + id + " progress: " + progress + "%");
        }

    }

    private void deleteAll(File d) {
        if (!d.exists()) return;
        if (d.isDirectory()) {
            for (File f : d.listFiles()) {
                deleteAll(f);
            }
        }
        d.delete();
    }
}
