PIG-4677: Display failure information on stop on failure (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1789530 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b51212..f01c312 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,8 @@
  
 BUG FIXES
 
+PIG-4677: Display failure information on stop on failure (rohini)
+
 PIG-5198: streaming job stuck with script failure when combined with split (knoguchi)
 
 PIG-5183: We shall mention NATIVE instead of MAPREDUCE operator in document (daijy)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
index 03d238b..b8c40a9 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
@@ -46,6 +46,7 @@
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
@@ -213,9 +214,10 @@
 
         boolean stop_on_failure =
             Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
+        boolean stoppedOnFailure = false;
 
         // jc is null only when mrp.size == 0
-        while(mrp.size() != 0) {
+        while(mrp.size() != 0 && !stoppedOnFailure) {
             jc = jcc.compile(mrp, grpName);
             if(jc == null) {
                 List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -377,9 +379,9 @@
                     MRPigStatsUtil.accumulateStats(jc);
 
                     // if stop_on_failure is enabled, we need to stop immediately when any job has failed
-                    checkStopOnFailure(stop_on_failure);
+                    stoppedOnFailure = stopJobsOnFailure(stop_on_failure);
                     // otherwise, we just display a warning message if there's any failure
-                    if (warn_failure && !jc.getFailedJobs().isEmpty()) {
+                    if (!stop_on_failure && warn_failure && !jc.getFailedJobs().isEmpty()) {
                         // we don't warn again for this group of jobs
                         warn_failure = false;
                         log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
@@ -409,13 +411,15 @@
 
                 if (!jc.getFailedJobs().isEmpty() ) {
                     // stop if stop_on_failure is enabled
-                    checkStopOnFailure(stop_on_failure);
+                    stoppedOnFailure = stopJobsOnFailure(stop_on_failure);
 
-                    // If we only have one store and that job fail, then we sure
-                    // that the job completely fail, and we shall stop dependent jobs
-                    for (Job job : jc.getFailedJobs()) {
-                        completeFailedJobsInThisRun.add(job);
-                        log.info("job " + job.getAssignedJobID() + " has failed! Stop running all dependent jobs");
+                    if (!stoppedOnFailure) {
+                        // If we only have one store and that job fail, then we sure
+                        // that the job completely fail, and we shall stop dependent jobs
+                        for (Job job : jc.getFailedJobs()) {
+                            completeFailedJobsInThisRun.add(job);
+                            log.info("job " + job.getAssignedJobID() + " has failed! Stop running all dependent jobs");
+                        }
                     }
                     failedJobs.addAll(jc.getFailedJobs());
                 }
@@ -552,33 +556,38 @@
                 // this method.
             }
         }
+
+        if (stoppedOnFailure) {
+            throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
+                    PigException.REMOTE_ENVIRONMENT);
+        }
         return pigStats;
     }
 
     /**
-     * If stop_on_failure is enabled and any job has failed, an ExecException is thrown.
+     * If stop_on_failure is enabled and any job has failed, it stops other jobs.
      * @param stop_on_failure whether it's enabled.
-     * @throws ExecException If stop_on_failure is enabled and any job is failed
+     * @return true if there were failed jobs and stop_on_failure is enabled
      */
-    private void checkStopOnFailure(boolean stop_on_failure) throws ExecException{
+    private boolean stopJobsOnFailure(boolean stop_on_failure) throws IOException, InterruptedException {
         if (jc.getFailedJobs().isEmpty())
-            return;
+            return false;
 
-        if (stop_on_failure){
-            int errCode = 6017;
-            StringBuilder msg = new StringBuilder();
-
-            for (int i=0; i<jc.getFailedJobs().size(); i++) {
-                Job j = jc.getFailedJobs().get(i);
-                msg.append("JobID: " + j.getAssignedJobID() + " Reason: " + j.getMessage());
-                if (i!=jc.getFailedJobs().size()-1) {
-                    msg.append("\n");
+        if (stop_on_failure) {
+            List<ControlledJob> readyJobsList = jc.getReadyJobsList();
+            List<ControlledJob> runningJobList = jc.getRunningJobList();
+            if (readyJobsList.size() > 0 || runningJobList.size() > 0) {
+                log.info("Some job(s) failed. Failing other ready and running jobs as -stop_on_failure is on");
+                for (ControlledJob job : readyJobsList) {
+                    job.failJob("Failing ready job for -stop_on_failure: " + job.getMapredJobId());
+                }
+                for (ControlledJob job : runningJobList) {
+                    job.failJob("Failing running job for -stop_on_failure: " + job.getMapredJobId());
                 }
             }
-
-            throw new ExecException(msg.toString(), errCode,
-                    PigException.REMOTE_ENVIRONMENT);
+            return true;
         }
+        return false;
     }
 
     /**
diff --git a/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java b/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
index 5137390..5bce44d 100644
--- a/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
+++ b/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
@@ -38,6 +38,7 @@
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -201,6 +202,14 @@
                 String diagnostics = tezJob.getDiagnostics();
                 tezDAGStats.setErrorMsg(diagnostics);
                 tezDAGStats.setBackendException(new TezException(diagnostics));
+                boolean stop_on_failure =
+                        Boolean.valueOf(pigContext.getProperties().getProperty("stop.on.failure", "false"));
+                if (stop_on_failure) {
+                    LogUtils.writeLog("Backend error message",
+                            diagnostics, pigContext.getProperties()
+                                    .getProperty("pig.logfile"),
+                            LOG);
+                }
                 tezScriptState.emitJobFailedNotification(tezDAGStats);
             }
             tezScriptState.dagCompletedNotification(tezJob.getName(), tezDAGStats);
diff --git a/test/org/apache/pig/test/TestGrunt.java b/test/org/apache/pig/test/TestGrunt.java
index f16ff60..9f60b9a 100644
--- a/test/org/apache/pig/test/TestGrunt.java
+++ b/test/org/apache/pig/test/TestGrunt.java
@@ -29,11 +29,13 @@
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.FilenameFilter;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.io.StringReader;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
@@ -41,6 +43,7 @@
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.PatternLayout;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
@@ -48,6 +51,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -55,6 +59,10 @@
 import org.apache.pig.test.Util.ProcessReturnInfo;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.pigscript.parser.ParseException;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.JobStats.JobState;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
@@ -967,6 +975,7 @@
         PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
         PigContext context = server.getPigContext();
         context.getProperties().setProperty("stop.on.failure", ""+true);
+        context.getProperties().setProperty("mapreduce.map.maxattempts", "2");
 
         String strCmd =
             "rmf bar;\n"
@@ -977,6 +986,10 @@
             + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n"
             +"B = stream A through `false`;\n"
             +"store B into 'bar' using BinStorage();\n"
+            +"A1 = load '"
+            + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n"
+            + "A1 = foreach A1 generate org.apache.pig.test.TestGrunt$SleepUDF();\n"
+            +"store A1 into 'bar1' using BinStorage();\n"
             +"A = load 'bar';\n"
             +"store A into 'foo';\n"
             +"cp pre done;\n";
@@ -994,10 +1007,65 @@
             assertTrue(e.getErrorCode() == 6017);
         }
 
+        if (Util.isMapredExecType(cluster.getExecType())) {
+            JobGraph jobGraph = PigStats.get().getJobGraph();
+            List<JobStats> failedJobs = jobGraph.getFailedJobs();
+            assertEquals(2, failedJobs.size());
+            // First job should have failed because of streaming error
+            assertTrue(failedJobs.get(0).getException().getMessage().contains(
+                    "Received Error while processing the map plan: "
+                    + "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'"
+                    + " failed with exit status: 1"));
+            // Second job with sleep should be killed as a result of stop on failure
+            assertTrue(failedJobs.get(1).getErrorMessage().startsWith("Failing running job for -stop_on_failure"));
+            // Third job which is dependent on first should not have started
+            assertEquals(1, getUnknownJobs(jobGraph).size());
+        } else {
+            // Tez
+            JobGraph jobGraph = PigStats.get().getJobGraph();
+            List<JobStats> failedJobs = jobGraph.getFailedJobs();
+            assertEquals(1, failedJobs.size());
+            // First job should have failed because of streaming error. Sleep is also part of same job
+            assertTrue(failedJobs.get(0).getException().getMessage().contains(
+                    "Received Error while processing the map plan: "
+                    + "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'"
+                    + " failed with exit status: 1"));
+            // Second job which is dependent on first should not have started
+            assertEquals(1, getUnknownJobs(jobGraph).size());
+        }
+
+        // Parallel job (sleep udf) should have not succeeded and been killed
+        assertFalse(server.existsFile("bar1"));
+        // fs cp command at the end should not have been executed
         assertFalse(server.existsFile("done"));
         assertTrue(caught);
     }
 
+    private ArrayList<JobStats> getUnknownJobs(JobGraph jobGraph) {
+        ArrayList<JobStats> unknown = new ArrayList<JobStats>();
+        Iterator<JobStats> iter = jobGraph.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            if (js.getState() == JobState.UNKNOWN) {
+                unknown.add(js);
+            }
+        }
+        return unknown;
+    }
+
+    public static class SleepUDF extends EvalFunc<Integer> {
+
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            try {
+                Thread.sleep(50000);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return null;
+        }
+    }
+
     @Test
     public void testFsCommand() throws Throwable {
 
@@ -1565,7 +1633,8 @@
         System.setIn(new ByteArrayInputStream(command.getBytes()));
         org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
         File[] partFiles = new File(".").listFiles(new FilenameFilter() {
-            public boolean accept(File dir, String name) { 
+            @Override
+            public boolean accept(File dir, String name) {
             return name.equals("测试");
         }
         });