PIG-4677: Display failure information on stop on failure (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1789530 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b51212..f01c312 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,8 @@
BUG FIXES
+PIG-4677: Display failure information on stop on failure (rohini)
+
PIG-5198: streaming job stuck with script failure when combined with split (knoguchi)
PIG-5183: We shall mention NATIVE instead of MAPREDUCE operator in document (daijy)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
index 03d238b..b8c40a9 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigRunner.ReturnCode;
@@ -213,9 +214,10 @@
boolean stop_on_failure =
Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
+ boolean stoppedOnFailure = false;
// jc is null only when mrp.size == 0
- while(mrp.size() != 0) {
+ while(mrp.size() != 0 && !stoppedOnFailure) {
jc = jcc.compile(mrp, grpName);
if(jc == null) {
List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -377,9 +379,9 @@
MRPigStatsUtil.accumulateStats(jc);
// if stop_on_failure is enabled, we need to stop immediately when any job has failed
- checkStopOnFailure(stop_on_failure);
+ stoppedOnFailure = stopJobsOnFailure(stop_on_failure);
// otherwise, we just display a warning message if there's any failure
- if (warn_failure && !jc.getFailedJobs().isEmpty()) {
+ if (!stop_on_failure && warn_failure && !jc.getFailedJobs().isEmpty()) {
// we don't warn again for this group of jobs
warn_failure = false;
log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
@@ -409,13 +411,15 @@
if (!jc.getFailedJobs().isEmpty() ) {
// stop if stop_on_failure is enabled
- checkStopOnFailure(stop_on_failure);
+ stoppedOnFailure = stopJobsOnFailure(stop_on_failure);
- // If we only have one store and that job fail, then we sure
- // that the job completely fail, and we shall stop dependent jobs
- for (Job job : jc.getFailedJobs()) {
- completeFailedJobsInThisRun.add(job);
- log.info("job " + job.getAssignedJobID() + " has failed! Stop running all dependent jobs");
+ if (!stoppedOnFailure) {
+ // If we only have one store and that job fail, then we sure
+ // that the job completely fail, and we shall stop dependent jobs
+ for (Job job : jc.getFailedJobs()) {
+ completeFailedJobsInThisRun.add(job);
+ log.info("job " + job.getAssignedJobID() + " has failed! Stop running all dependent jobs");
+ }
}
failedJobs.addAll(jc.getFailedJobs());
}
@@ -552,33 +556,38 @@
// this method.
}
}
+
+ if (stoppedOnFailure) {
+ throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
+ PigException.REMOTE_ENVIRONMENT);
+ }
return pigStats;
}
/**
- * If stop_on_failure is enabled and any job has failed, an ExecException is thrown.
+ * If stop_on_failure is enabled and any job has failed, it stops other jobs.
* @param stop_on_failure whether it's enabled.
- * @throws ExecException If stop_on_failure is enabled and any job is failed
+ * @return true if there were failed jobs and stop_on_failure is enabled
*/
- private void checkStopOnFailure(boolean stop_on_failure) throws ExecException{
+ private boolean stopJobsOnFailure(boolean stop_on_failure) throws IOException, InterruptedException {
if (jc.getFailedJobs().isEmpty())
- return;
+ return false;
- if (stop_on_failure){
- int errCode = 6017;
- StringBuilder msg = new StringBuilder();
-
- for (int i=0; i<jc.getFailedJobs().size(); i++) {
- Job j = jc.getFailedJobs().get(i);
- msg.append("JobID: " + j.getAssignedJobID() + " Reason: " + j.getMessage());
- if (i!=jc.getFailedJobs().size()-1) {
- msg.append("\n");
+ if (stop_on_failure) {
+ List<ControlledJob> readyJobsList = jc.getReadyJobsList();
+ List<ControlledJob> runningJobList = jc.getRunningJobList();
+ if (readyJobsList.size() > 0 || runningJobList.size() > 0) {
+ log.info("Some job(s) failed. Failing other ready and running jobs as -stop_on_failure is on");
+ for (ControlledJob job : readyJobsList) {
+ job.failJob("Failing ready job for -stop_on_failure: " + job.getMapredJobId());
+ }
+ for (ControlledJob job : runningJobList) {
+ job.failJob("Failing running job for -stop_on_failure: " + job.getMapredJobId());
}
}
-
- throw new ExecException(msg.toString(), errCode,
- PigException.REMOTE_ENVIRONMENT);
+ return true;
}
+ return false;
}
/**
diff --git a/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java b/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
index 5137390..5bce44d 100644
--- a/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
+++ b/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
@@ -38,6 +38,7 @@
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -201,6 +202,14 @@
String diagnostics = tezJob.getDiagnostics();
tezDAGStats.setErrorMsg(diagnostics);
tezDAGStats.setBackendException(new TezException(diagnostics));
+ boolean stop_on_failure =
+ Boolean.valueOf(pigContext.getProperties().getProperty("stop.on.failure", "false"));
+ if (stop_on_failure) {
+ LogUtils.writeLog("Backend error message",
+ diagnostics, pigContext.getProperties()
+ .getProperty("pig.logfile"),
+ LOG);
+ }
tezScriptState.emitJobFailedNotification(tezDAGStats);
}
tezScriptState.dagCompletedNotification(tezJob.getName(), tezDAGStats);
diff --git a/test/org/apache/pig/test/TestGrunt.java b/test/org/apache/pig/test/TestGrunt.java
index f16ff60..9f60b9a 100644
--- a/test/org/apache/pig/test/TestGrunt.java
+++ b/test/org/apache/pig/test/TestGrunt.java
@@ -29,11 +29,13 @@
import java.io.FileReader;
import java.io.FileWriter;
import java.io.FilenameFilter;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -41,6 +43,7 @@
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
+import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
@@ -48,6 +51,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -55,6 +59,10 @@
import org.apache.pig.test.Util.ProcessReturnInfo;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.tools.pigscript.parser.ParseException;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.JobStats.JobState;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
@@ -967,6 +975,7 @@
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
context.getProperties().setProperty("stop.on.failure", ""+true);
+ context.getProperties().setProperty("mapreduce.map.maxattempts", "2");
String strCmd =
"rmf bar;\n"
@@ -977,6 +986,10 @@
+ Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n"
+"B = stream A through `false`;\n"
+"store B into 'bar' using BinStorage();\n"
+ +"A1 = load '"
+ + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n"
+ + "A1 = foreach A1 generate org.apache.pig.test.TestGrunt$SleepUDF();\n"
+ +"store A1 into 'bar1' using BinStorage();\n"
+"A = load 'bar';\n"
+"store A into 'foo';\n"
+"cp pre done;\n";
@@ -994,10 +1007,65 @@
assertTrue(e.getErrorCode() == 6017);
}
+ if (Util.isMapredExecType(cluster.getExecType())) {
+ JobGraph jobGraph = PigStats.get().getJobGraph();
+ List<JobStats> failedJobs = jobGraph.getFailedJobs();
+ assertEquals(2, failedJobs.size());
+ // First job should have failed because of streaming error
+ assertTrue(failedJobs.get(0).getException().getMessage().contains(
+ "Received Error while processing the map plan: "
+ + "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'"
+ + " failed with exit status: 1"));
+ // Second job with sleep should be killed as a result of stop on failure
+ assertTrue(failedJobs.get(1).getErrorMessage().startsWith("Failing running job for -stop_on_failure"));
+ // Third job which is dependent on first should not have started
+ assertEquals(1, getUnknownJobs(jobGraph).size());
+ } else {
+ // Tez
+ JobGraph jobGraph = PigStats.get().getJobGraph();
+ List<JobStats> failedJobs = jobGraph.getFailedJobs();
+ assertEquals(1, failedJobs.size());
+ // First job should have failed because of streaming error. Sleep is also part of same job
+ assertTrue(failedJobs.get(0).getException().getMessage().contains(
+ "Received Error while processing the map plan: "
+ + "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'"
+ + " failed with exit status: 1"));
+ // Second job which is dependent on first should not have started
+ assertEquals(1, getUnknownJobs(jobGraph).size());
+ }
+
+ // Parallel job (sleep udf) should have not succeeded and been killed
+ assertFalse(server.existsFile("bar1"));
+ // fs cp command at the end should not have been executed
assertFalse(server.existsFile("done"));
assertTrue(caught);
}
+ private ArrayList<JobStats> getUnknownJobs(JobGraph jobGraph) {
+ ArrayList<JobStats> unknown = new ArrayList<JobStats>();
+ Iterator<JobStats> iter = jobGraph.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ if (js.getState() == JobState.UNKNOWN) {
+ unknown.add(js);
+ }
+ }
+ return unknown;
+ }
+
+ public static class SleepUDF extends EvalFunc<Integer> {
+
+ @Override
+ public Integer exec(Tuple input) throws IOException {
+ try {
+ Thread.sleep(50000);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return null;
+ }
+ }
+
@Test
public void testFsCommand() throws Throwable {
@@ -1565,7 +1633,8 @@
System.setIn(new ByteArrayInputStream(command.getBytes()));
org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
File[] partFiles = new File(".").listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
+ @Override
+ public boolean accept(File dir, String name) {
return name.equals("测试");
}
});