Oozie 3.3.0-rc0 release
git-svn-id: https://svn.apache.org/repos/asf/oozie/tags/release-3.3.0-rc0@1412028 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 68bd301..09050c4 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -59,15 +59,17 @@
private static class ErrorInfo {
ActionExecutorException.ErrorType errorType;
String errorCode;
+ Class<?> errorClass;
- private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) {
+ private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) {
this.errorType = errorType;
this.errorCode = errorCode;
+ this.errorClass = errorClass;
}
}
private static boolean initMode = false;
- private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>();
+ private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>();
/**
* Context information passed to the ActionExecutor methods.
@@ -266,7 +268,7 @@
*/
public void initActionType() {
XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
- ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>());
+ ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
}
/**
@@ -310,9 +312,9 @@
throw new IllegalStateException("Error, action type info locked");
}
try {
- Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
- Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
- executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode));
+ Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
+ Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
+ executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass));
}
catch (ClassNotFoundException cnfe) {
XLog.getLog(getClass()).warn(
@@ -383,16 +385,41 @@
if (ex instanceof ActionExecutorException) {
return (ActionExecutorException) ex;
}
- for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) {
- if (errorInfo.getKey().isInstance(ex)) {
- return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode,
- "{0}", ex.getMessage(), ex);
+
+ ActionExecutorException aee = null;
+ // Check the cause of the exception first
+ if (ex.getCause() != null) {
+ aee = convertExceptionHelper(ex.getCause());
+ }
+ // If the cause isn't registered or doesn't exist, check the exception itself
+ if (aee == null) {
+ aee = convertExceptionHelper(ex);
+ // If the cause isn't registered either, then just create a new ActionExecutorException
+ if (aee == null) {
+ String exClass = ex.getClass().getName();
+ String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1);
+ aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex);
}
}
- String errorCode = ex.getClass().getName();
- errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1);
- return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(),
- ex);
+ return aee;
+ }
+
+ private ActionExecutorException convertExceptionHelper(Throwable ex) {
+ Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
+ // Check if we have registered ex
+ ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName());
+ if (classErrorInfo != null) {
+ return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
+ }
+ // Else, check if a parent class of ex is registered
+ else {
+ for (ErrorInfo errorInfo : executorErrorInfo.values()) {
+ if (errorInfo.errorClass.isInstance(ex)) {
+ return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex);
+ }
+ }
+ }
+ return null;
}
/**
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 25ab1aa..6db58c5 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -28,6 +28,7 @@
import org.apache.oozie.XException;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowAction.Status;
import org.apache.oozie.client.rest.JsonBean;
@@ -158,6 +159,9 @@
ActionExecutorContext context = null;
try {
boolean isRetry = false;
+ if (wfAction.getRetries() > 0) {
+ isRetry = true;
+ }
boolean isUserRetry = false;
context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
incrActionCounter(wfAction.getType(), 1);
@@ -198,6 +202,14 @@
case ERROR:
handleUserRetry(wfAction);
break;
+ case TRANSIENT: // retry N times, then suspend workflow
+ if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
+ handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
+ wfAction.setPendingAge(new Date());
+ wfAction.setRetries(0);
+ wfAction.setStartTime(null);
+ }
+ break;
}
wfAction.setLastCheckTime(new Date());
updateList = new ArrayList<JsonBean>();
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
index a07b6ca..8e71552 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
@@ -17,6 +17,8 @@
*/
package org.apache.oozie.command.wf;
+import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -24,15 +26,22 @@
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.control.EndActionExecutor;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.action.control.JoinActionExecutor;
+import org.apache.oozie.action.control.KillActionExecutor;
+import org.apache.oozie.action.control.StartActionExecutor;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -79,6 +88,19 @@
if (action.isPending()) {
if (action.getStatus() == WorkflowActionBean.Status.PREP
|| action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
+ // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of
+ // a repeated transient error, we have to clean up the action dir
+ if (!action.getType().equals(StartActionExecutor.TYPE) && // The control actions have invalid
+ !action.getType().equals(ForkActionExecutor.TYPE) && // action dir paths because they
+ !action.getType().equals(JoinActionExecutor.TYPE) && // contain ":" (colons)
+ !action.getType().equals(KillActionExecutor.TYPE) &&
+ !action.getType().equals(EndActionExecutor.TYPE)) {
+ ActionExecutorContext context =
+ new ActionXCommand.ActionExecutorContext(workflow, action, false, false);
+ if (context.getAppFileSystem().exists(context.getActionDir())) {
+ context.getAppFileSystem().delete(context.getActionDir(), true);
+ }
+ }
queue(new ActionStartXCommand(action.getId(), action.getType()));
}
else {
@@ -118,6 +140,15 @@
catch (JPAExecutorException e) {
throw new CommandException(e);
}
+ catch (HadoopAccessorException e) {
+ throw new CommandException(e);
+ }
+ catch (IOException e) {
+ throw new CommandException(ErrorCode.E0902, e);
+ }
+ catch (URISyntaxException e) {
+ throw new CommandException(ErrorCode.E0902, e);
+ }
finally {
// update coordinator action
new CoordActionUpdateXCommand(workflow).call();
diff --git a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
index 83e150a..180274f 100644
--- a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.action;
+import java.io.EOFException;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.client.WorkflowAction;
@@ -146,5 +147,73 @@
fail();
}
+ cause = new EOFException(); // not registered, but subclass of IOException
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(cause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.TRANSIENT, ex.getErrorType());
+ assertEquals("IO", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ Exception rootCause = new RemoteException();
+ cause = new RuntimeException(rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.NON_TRANSIENT, ex.getErrorType());
+ assertEquals("RMI", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ rootCause = new RemoteException();
+ cause = new IOException(rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.NON_TRANSIENT, ex.getErrorType());
+ assertEquals("RMI", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ rootCause = new IOException();
+ cause = new RemoteException("x", rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.TRANSIENT, ex.getErrorType());
+ assertEquals("IO", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ rootCause = new EOFException(); // not registered, but subclass of IOException
+ cause = new RemoteException("x", rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.TRANSIENT, ex.getErrorType());
+ assertEquals("IO", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
}
}
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index b17c11b..4b013a6 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -39,6 +39,7 @@
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.JPAService;
@@ -237,6 +238,239 @@
}
+ public void testActionCheckTransientDuringLauncher() throws Exception {
+ services.destroy();
+ // Make the ActionCheckXCommand run more frequently so the test won't take as long
+ setSystemProperty("oozie.service.ActionCheckerService.action.check.interval", "10");
+ setSystemProperty("oozie.service.ActionCheckerService.action.check.delay", "20");
+ // Make the max number of retries lower so the test won't take as long
+ final int maxRetries = 2;
+ setSystemProperty("oozie.action.retries.max", Integer.toString(maxRetries));
+ services = new Services();
+ services.init();
+
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ WorkflowJobBean job0 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ final String jobId = job0.getId();
+ WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1", WorkflowAction.Status.PREP);
+ final String actionId = action0.getId();
+ final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(actionId);
+
+ new ActionStartXCommand(actionId, "map-reduce").call();
+ final WorkflowActionBean action1 = jpaService.execute(wfActionGetCmd);
+ String originalLauncherId = action1.getExternalId();
+
+ // At this point, the launcher job has started (but not finished)
+ // Now, shutdown the job tracker to pretend it has gone down during the launcher job
+ executeWhileJobTrackerIsShutdown(new ShutdownJobTrackerExecutable() {
+ @Override
+ public void execute() throws Exception {
+ assertEquals(0, action1.getRetries());
+ new ActionCheckXCommand(actionId).call();
+
+ waitFor(30 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+ return (action1a.getRetries() > 0);
+ }
+ });
+ waitFor(180 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+ return (action1a.getRetries() == 0);
+ }
+ });
+ WorkflowActionBean action1b = jpaService.execute(wfActionGetCmd);
+ assertEquals(0, action1b.getRetries());
+ assertEquals("START_MANUAL", action1b.getStatusStr());
+
+ WorkflowJobBean job1 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("SUSPENDED", job1.getStatusStr());
+
+ // At this point, the action has gotten a transient error, even after maxRetries tries so the workflow has been
+ // SUSPENDED
+ }
+ });
+ // Now, lets bring the job tracker back up and resume the workflow (which will restart the current action)
+ // It should now continue and finish with SUCCEEDED
+ new ResumeXCommand(jobId).call();
+ WorkflowJobBean job2 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("RUNNING", job2.getStatusStr());
+
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job2, action1, false, false);
+ WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
+ MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+ JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf()));
+ String user = conf.get("user.name");
+ JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action3 = jpaService.execute(wfActionGetCmd);
+ String launcherId = action3.getExternalId();
+ assertFalse(originalLauncherId.equals(launcherId));
+
+ final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return launcherJob.isComplete();
+ }
+ });
+ assertTrue(launcherJob.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
+ String mapperId = action4.getExternalId();
+
+ assertFalse(launcherId.equals(mapperId));
+
+ final RunningJob mrJob = jobClient.getJob(JobID.forName(mapperId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return mrJob.isComplete();
+ }
+ });
+ assertTrue(mrJob.isSuccessful());
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action5 = jpaService.execute(wfActionGetCmd);
+
+ assertEquals("SUCCEEDED", action5.getExternalStatus());
+ }
+
+ public void testActionCheckTransientDuringMRAction() throws Exception {
+ services.destroy();
+ // Make the ActionCheckXCommand run more frequently so the test won't take as long
+ setSystemProperty("oozie.service.ActionCheckerService.action.check.interval", "10");
+ setSystemProperty("oozie.service.ActionCheckerService.action.check.delay", "20");
+ // Make the max number of retries lower so the test won't take as long
+ final int maxRetries = 2;
+ setSystemProperty("oozie.action.retries.max", Integer.toString(maxRetries));
+ services = new Services();
+ services.init();
+
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ WorkflowJobBean job0 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ final String jobId = job0.getId();
+ WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1", WorkflowAction.Status.PREP);
+ final String actionId = action0.getId();
+ final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(actionId);
+
+ new ActionStartXCommand(actionId, "map-reduce").call();
+ final WorkflowActionBean action1 = jpaService.execute(wfActionGetCmd);
+ String originalLauncherId = action1.getExternalId();
+
+ ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job0, action1, false, false);
+ MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+ JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
+ String user = conf.get("user.name");
+ JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+
+ final RunningJob launcherJob = jobClient.getJob(JobID.forName(originalLauncherId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return launcherJob.isComplete();
+ }
+ });
+ assertTrue(launcherJob.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+
+ new ActionCheckXCommand(action1.getId()).call();
+ WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
+ String originalMapperId = action2.getExternalId();
+
+ assertFalse(originalLauncherId.equals(originalMapperId));
+
+ // At this point, the launcher job has finished and the map-reduce action has started (but not finished)
+ // Now, shutdown the job tracker to pretend it has gone down during the map-reduce job
+ executeWhileJobTrackerIsShutdown(new ShutdownJobTrackerExecutable() {
+ @Override
+ public void execute() throws Exception {
+ assertEquals(0, action1.getRetries());
+ new ActionCheckXCommand(actionId).call();
+
+ waitFor(30 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+ return (action1a.getRetries() > 0);
+ }
+ });
+ waitFor(180 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+ return (action1a.getRetries() == 0);
+ }
+ });
+ WorkflowActionBean action1b = jpaService.execute(wfActionGetCmd);
+ assertEquals(0, action1b.getRetries());
+ assertEquals("START_MANUAL", action1b.getStatusStr());
+
+ WorkflowJobBean job1 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("SUSPENDED", job1.getStatusStr());
+
+ // At this point, the action has gotten a transient error, even after maxRetries tries so the workflow has been
+ // SUSPENDED
+ }
+ });
+ // Now, lets bring the job tracker back up and resume the workflow (which will restart the current action)
+ // It should now continue and finish with SUCCEEDED
+ new ResumeXCommand(jobId).call();
+ WorkflowJobBean job2 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("RUNNING", job2.getStatusStr());
+
+ sleep(500);
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action3 = jpaService.execute(wfActionGetCmd);
+ String launcherId = action3.getExternalId();
+
+ assertFalse(originalLauncherId.equals(launcherId));
+ assertFalse(originalMapperId.equals(launcherId));
+
+ final RunningJob launcherJob2 = jobClient.getJob(JobID.forName(launcherId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return launcherJob2.isComplete();
+ }
+ });
+ assertTrue(launcherJob2.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob2));
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
+ String mapperId = action4.getExternalId();
+ assertFalse(originalMapperId.equals(mapperId));
+
+ assertFalse(launcherId.equals(mapperId));
+
+ final RunningJob mrJob = jobClient.getJob(JobID.forName(mapperId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return mrJob.isComplete();
+ }
+ });
+ assertTrue(mrJob.isSuccessful());
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action5 = jpaService.execute(wfActionGetCmd);
+
+ assertEquals("SUCCEEDED", action5.getExternalStatus());
+ }
+
@Override
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status) throws Exception {
WorkflowActionBean action = createWorkflowActionSetPending(wfId, status);
@@ -281,6 +515,7 @@
String actionXml = "<map-reduce>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<prepare><delete path=\"" + outputDir.toString() + "\"/></prepare>" +
"<configuration>" +
"<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() +
"</value></property>" +
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 7bcff6b..fe8c867 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -805,5 +805,38 @@
return jobConf;
}
+ /**
+ * A 'closure' used by {@link XTestCase#executeWhileJobTrackerIsShutdown} method.
+ */
+ public static interface ShutdownJobTrackerExecutable {
+
+ /**
+ * Execute some code
+ *
+ * @throws Exception thrown if the executed code throws an exception.
+ */
+ public void execute() throws Exception;
+ }
+
+ /**
+ * Execute some code, expressed via a {@link ShutdownJobTrackerExecutable}, while the JobTracker is shutdown. Once the code has
+ * finished, the JobTracker is restarted (even if an exception occurs).
+ *
+ * @param executable The ShutdownJobTrackerExecutable to execute while the JobTracker is shutdown
+ */
+ protected void executeWhileJobTrackerIsShutdown(ShutdownJobTrackerExecutable executable) {
+ mrCluster.stopJobTracker();
+ Exception ex = null;
+ try {
+ executable.execute();
+ } catch (Exception e) {
+ ex = e;
+ } finally {
+ mrCluster.startJobTracker();
+ }
+ if (ex != null) {
+ throw new RuntimeException(ex);
+ }
+ }
}
diff --git a/core/src/test/resources/graphWF.xml b/core/src/test/resources/graphWF.xml
index 91e3f2e..6a7b042 100644
--- a/core/src/test/resources/graphWF.xml
+++ b/core/src/test/resources/graphWF.xml
@@ -1,6 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<<<<<<< HEAD
-=======
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -18,7 +16,6 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
->>>>>>> 028ab12... OOZIE-1013 Build failing as the license header comment is appearing before xml declaration in some files (virag)
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sm3-segment-3908-251483">
<start to="or_0_1"/>
<kill name="kill">
diff --git a/core/src/test/resources/invalidGraphWF.xml b/core/src/test/resources/invalidGraphWF.xml
index 10f35ee..8103bca 100644
--- a/core/src/test/resources/invalidGraphWF.xml
+++ b/core/src/test/resources/invalidGraphWF.xml
@@ -1,6 +1,4 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<<<<<<< HEAD
-=======
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -18,7 +16,6 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
->>>>>>> 028ab12... OOZIE-1013 Build failing as the license header comment is appearing before xml declaration in some files (virag)
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sm3-segment-3908-251483">
<start to="or_0_1"/>
<kill name="kill">
diff --git a/release-log.txt b/release-log.txt
index f558ebe..2e6cbca 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,7 @@
--- Oozie 3.3.0 release (unreleased)
+-- Oozie 3.3.0 release
+OOZIE-1005 Tests from OOZIE-994 use wrong condition in waitFor (rkanter via virag)
+OOZIE-994 ActionCheckXCommand does not handle failures properly(rkanter via mohammad)
OOZIE-1058 ACL modify-job should not be hardcoded to group name(mona via mohammad)
OOZIE-1052 HadoopAccessorService.createFileSystem throws exception in map-reduce action, failing workflow.(ryota via mohammad).
OOZIE-1060 bump hadoop 2.X version to 2.0.2-alpha (rvs via tucu)