Oozie 3.3.0-rc0 release

git-svn-id: https://svn.apache.org/repos/asf/oozie/tags/release-3.3.0-rc0@1412028 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 68bd301..09050c4 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -59,15 +59,17 @@
     private static class ErrorInfo {
         ActionExecutorException.ErrorType errorType;
         String errorCode;
+        Class<?> errorClass;
 
-        private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode) {
+        private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) {
             this.errorType = errorType;
             this.errorCode = errorCode;
+            this.errorClass = errorClass;
         }
     }
 
     private static boolean initMode = false;
-    private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<Class, ErrorInfo>>();
+    private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>();
 
     /**
      * Context information passed to the ActionExecutor methods.
@@ -266,7 +268,7 @@
      */
     public void initActionType() {
         XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
-        ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>());
+        ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
     }
 
     /**
@@ -310,9 +312,9 @@
             throw new IllegalStateException("Error, action type info locked");
         }
         try {
-            Class klass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
-            Map<Class, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
-            executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode));
+            Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
+            Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
+            executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass));
         }
         catch (ClassNotFoundException cnfe) {
             XLog.getLog(getClass()).warn(
@@ -383,16 +385,41 @@
         if (ex instanceof ActionExecutorException) {
             return (ActionExecutorException) ex;
         }
-        for (Map.Entry<Class, ErrorInfo> errorInfo : ERROR_INFOS.get(getType()).entrySet()) {
-            if (errorInfo.getKey().isInstance(ex)) {
-                return new ActionExecutorException(errorInfo.getValue().errorType, errorInfo.getValue().errorCode,
-                                                   "{0}", ex.getMessage(), ex);
+
+        ActionExecutorException aee = null;
+        // Check the cause of the exception first
+        if (ex.getCause() != null) {
+            aee = convertExceptionHelper(ex.getCause());
+        }
+        // If the cause isn't registered or doesn't exist, check the exception itself
+        if (aee == null) {
+            aee = convertExceptionHelper(ex);
+            // If the cause isn't registered either, then just create a new ActionExecutorException
+            if (aee == null) {
+                String exClass = ex.getClass().getName();
+                String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1);
+                aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex);
             }
         }
-        String errorCode = ex.getClass().getName();
-        errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1);
-        return new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(),
-                                           ex);
+        return aee;
+    }
+
+    private ActionExecutorException convertExceptionHelper(Throwable ex) {
+        Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
+        // Check if we have registered ex
+        ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName());
+        if (classErrorInfo != null) {
+            return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
+        }
+        // Else, check if a parent class of ex is registered
+        else {
+            for (ErrorInfo errorInfo : executorErrorInfo.values()) {
+                if (errorInfo.errorClass.isInstance(ex)) {
+                    return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex);
+                }
+            }
+        }
+        return null;
     }
 
     /**
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
index 25ab1aa..6db58c5 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
@@ -28,6 +28,7 @@
 import org.apache.oozie.XException;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.WorkflowAction.Status;
 import org.apache.oozie.client.rest.JsonBean;
@@ -158,6 +159,9 @@
         ActionExecutorContext context = null;
         try {
             boolean isRetry = false;
+            if (wfAction.getRetries() > 0) {
+                isRetry = true;
+            }
             boolean isUserRetry = false;
             context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
             incrActionCounter(wfAction.getType(), 1);
@@ -198,6 +202,14 @@
                 case ERROR:
                     handleUserRetry(wfAction);
                     break;
+                case TRANSIENT:                 // retry N times, then suspend workflow
+                    if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
+                        handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
+                        wfAction.setPendingAge(new Date());
+                        wfAction.setRetries(0);
+                        wfAction.setStartTime(null);
+                    }
+                    break;
             }
             wfAction.setLastCheckTime(new Date());
             updateList = new ArrayList<JsonBean>();
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
index a07b6ca..8e71552 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
@@ -17,6 +17,8 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -24,15 +26,22 @@
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.control.EndActionExecutor;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.action.control.JoinActionExecutor;
+import org.apache.oozie.action.control.KillActionExecutor;
+import org.apache.oozie.action.control.StartActionExecutor;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.InstrumentUtils;
@@ -79,6 +88,19 @@
                     if (action.isPending()) {
                         if (action.getStatus() == WorkflowActionBean.Status.PREP
                                 || action.getStatus() == WorkflowActionBean.Status.START_MANUAL) {
+                            // When resuming a workflow that was programatically suspended (via ActionCheckXCommand) because of
+                            // a repeated transient error, we have to clean up the action dir
+                            if (!action.getType().equals(StartActionExecutor.TYPE) &&       // The control actions have invalid
+                                !action.getType().equals(ForkActionExecutor.TYPE) &&        // action dir paths because they
+                                !action.getType().equals(JoinActionExecutor.TYPE) &&        // contain ":" (colons)
+                                !action.getType().equals(KillActionExecutor.TYPE) &&
+                                !action.getType().equals(EndActionExecutor.TYPE)) {
+                                ActionExecutorContext context =
+                                        new ActionXCommand.ActionExecutorContext(workflow, action, false, false);
+                                if (context.getAppFileSystem().exists(context.getActionDir())) {
+                                    context.getAppFileSystem().delete(context.getActionDir(), true);
+                                }
+                            }
                             queue(new ActionStartXCommand(action.getId(), action.getType()));
                         }
                         else {
@@ -118,6 +140,15 @@
         catch (JPAExecutorException e) {
             throw new CommandException(e);
         }
+        catch (HadoopAccessorException e) {
+            throw new CommandException(e);
+        }
+        catch (IOException e) {
+            throw new CommandException(ErrorCode.E0902, e);
+        }
+        catch (URISyntaxException e) {
+            throw new CommandException(ErrorCode.E0902, e);
+        }
         finally {
             // update coordinator action
             new CoordActionUpdateXCommand(workflow).call();
diff --git a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
index 83e150a..180274f 100644
--- a/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.action;
 
+import java.io.EOFException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.client.WorkflowAction;
@@ -146,5 +147,73 @@
             fail();
         }
 
+        cause = new EOFException();     // not registered, but subclass of IOException
+        try {
+            throw ae.convertException(cause);
+        }
+        catch (ActionExecutorException ex) {
+            assertEquals(cause, ex.getCause());
+            assertEquals(ActionExecutorException.ErrorType.TRANSIENT, ex.getErrorType());
+            assertEquals("IO", ex.getErrorCode());
+        }
+        catch (Exception ex) {
+            fail();
+        }
+
+        Exception rootCause = new RemoteException();
+        cause = new RuntimeException(rootCause);
+        try {
+            throw ae.convertException(cause);
+        }
+        catch (ActionExecutorException ex) {
+            assertEquals(rootCause, ex.getCause());
+            assertEquals(ActionExecutorException.ErrorType.NON_TRANSIENT, ex.getErrorType());
+            assertEquals("RMI", ex.getErrorCode());
+        }
+        catch (Exception ex) {
+            fail();
+        }
+
+        rootCause = new RemoteException();
+        cause = new IOException(rootCause);
+        try {
+            throw ae.convertException(cause);
+        }
+        catch (ActionExecutorException ex) {
+            assertEquals(rootCause, ex.getCause());
+            assertEquals(ActionExecutorException.ErrorType.NON_TRANSIENT, ex.getErrorType());
+            assertEquals("RMI", ex.getErrorCode());
+        }
+        catch (Exception ex) {
+            fail();
+        }
+
+        rootCause = new IOException();
+        cause = new RemoteException("x", rootCause);
+        try {
+            throw ae.convertException(cause);
+        }
+        catch (ActionExecutorException ex) {
+            assertEquals(rootCause, ex.getCause());
+            assertEquals(ActionExecutorException.ErrorType.TRANSIENT, ex.getErrorType());
+            assertEquals("IO", ex.getErrorCode());
+        }
+        catch (Exception ex) {
+            fail();
+        }
+
+        rootCause = new EOFException();     // not registered, but subclass of IOException
+        cause = new RemoteException("x", rootCause);
+        try {
+            throw ae.convertException(cause);
+        }
+        catch (ActionExecutorException ex) {
+            assertEquals(rootCause, ex.getCause());
+            assertEquals(ActionExecutorException.ErrorType.TRANSIENT, ex.getErrorType());
+            assertEquals("IO", ex.getErrorCode());
+        }
+        catch (Exception ex) {
+            fail();
+        }
     }
 }
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index b17c11b..4b013a6 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -39,6 +39,7 @@
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.JPAService;
@@ -237,6 +238,239 @@
 
     }
 
+    public void testActionCheckTransientDuringLauncher() throws Exception {
+        services.destroy();
+        // Make the ActionCheckXCommand run more frequently so the test won't take as long
+        setSystemProperty("oozie.service.ActionCheckerService.action.check.interval", "10");
+        setSystemProperty("oozie.service.ActionCheckerService.action.check.delay", "20");
+        // Make the max number of retries lower so the test won't take as long
+        final int maxRetries = 2;
+        setSystemProperty("oozie.action.retries.max", Integer.toString(maxRetries));
+        services = new Services();
+        services.init();
+
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        WorkflowJobBean job0 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        final String jobId = job0.getId();
+        WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1", WorkflowAction.Status.PREP);
+        final String actionId = action0.getId();
+        final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(actionId);
+
+        new ActionStartXCommand(actionId, "map-reduce").call();
+        final WorkflowActionBean action1 = jpaService.execute(wfActionGetCmd);
+        String originalLauncherId = action1.getExternalId();
+
+        // At this point, the launcher job has started (but not finished)
+        // Now, shutdown the job tracker to pretend it has gone down during the launcher job
+        executeWhileJobTrackerIsShutdown(new ShutdownJobTrackerExecutable() {
+            @Override
+            public void execute() throws Exception {
+                assertEquals(0, action1.getRetries());
+                new ActionCheckXCommand(actionId).call();
+
+                waitFor(30 * 1000, new Predicate() {
+                    @Override
+                    public boolean evaluate() throws Exception {
+                        WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+                        return (action1a.getRetries() > 0);
+                    }
+                });
+                waitFor(180 * 1000, new Predicate() {
+                    @Override
+                    public boolean evaluate() throws Exception {
+                        WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+                        return (action1a.getRetries() == 0);
+                    }
+                });
+                WorkflowActionBean action1b = jpaService.execute(wfActionGetCmd);
+                assertEquals(0, action1b.getRetries());
+                assertEquals("START_MANUAL", action1b.getStatusStr());
+
+                WorkflowJobBean job1 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+                assertEquals("SUSPENDED", job1.getStatusStr());
+
+                // At this point, the action has gotten a transient error, even after maxRetries tries so the workflow has been
+                // SUSPENDED
+            }
+        });
+        // Now, lets bring the job tracker back up and resume the workflow (which will restart the current action)
+        // It should now continue and finish with SUCCEEDED
+        new ResumeXCommand(jobId).call();
+        WorkflowJobBean job2 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+        assertEquals("RUNNING", job2.getStatusStr());
+
+        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job2, action1, false, false);
+        WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
+        MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf()));
+        String user = conf.get("user.name");
+        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+
+        new ActionCheckXCommand(actionId).call();
+        WorkflowActionBean action3 = jpaService.execute(wfActionGetCmd);
+        String launcherId = action3.getExternalId();
+        assertFalse(originalLauncherId.equals(launcherId));
+
+        final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+
+        waitFor(120 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return launcherJob.isComplete();
+            }
+        });
+        assertTrue(launcherJob.isSuccessful());
+        assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+
+        new ActionCheckXCommand(actionId).call();
+        WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
+        String mapperId = action4.getExternalId();
+
+        assertFalse(launcherId.equals(mapperId));
+
+        final RunningJob mrJob = jobClient.getJob(JobID.forName(mapperId));
+
+        waitFor(120 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return mrJob.isComplete();
+            }
+        });
+        assertTrue(mrJob.isSuccessful());
+
+        new ActionCheckXCommand(actionId).call();
+        WorkflowActionBean action5 = jpaService.execute(wfActionGetCmd);
+
+        assertEquals("SUCCEEDED", action5.getExternalStatus());
+    }
+
+    public void testActionCheckTransientDuringMRAction() throws Exception {
+        services.destroy();
+        // Make the ActionCheckXCommand run more frequently so the test won't take as long
+        setSystemProperty("oozie.service.ActionCheckerService.action.check.interval", "10");
+        setSystemProperty("oozie.service.ActionCheckerService.action.check.delay", "20");
+        // Make the max number of retries lower so the test won't take as long
+        final int maxRetries = 2;
+        setSystemProperty("oozie.action.retries.max", Integer.toString(maxRetries));
+        services = new Services();
+        services.init();
+
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        WorkflowJobBean job0 = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        final String jobId = job0.getId();
+        WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1", WorkflowAction.Status.PREP);
+        final String actionId = action0.getId();
+        final WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(actionId);
+
+        new ActionStartXCommand(actionId, "map-reduce").call();
+        final WorkflowActionBean action1 = jpaService.execute(wfActionGetCmd);
+        String originalLauncherId = action1.getExternalId();
+
+        ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job0, action1, false, false);
+        MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
+        String user = conf.get("user.name");
+        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+
+        final RunningJob launcherJob = jobClient.getJob(JobID.forName(originalLauncherId));
+
+        waitFor(120 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return launcherJob.isComplete();
+            }
+        });
+        assertTrue(launcherJob.isSuccessful());
+        assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+
+        new ActionCheckXCommand(action1.getId()).call();
+        WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
+        String originalMapperId = action2.getExternalId();
+
+        assertFalse(originalLauncherId.equals(originalMapperId));
+
+        // At this point, the launcher job has finished and the map-reduce action has started (but not finished)
+        // Now, shutdown the job tracker to pretend it has gone down during the map-reduce job
+        executeWhileJobTrackerIsShutdown(new ShutdownJobTrackerExecutable() {
+            @Override
+            public void execute() throws Exception {
+                assertEquals(0, action1.getRetries());
+                new ActionCheckXCommand(actionId).call();
+
+                waitFor(30 * 1000, new Predicate() {
+                    @Override
+                    public boolean evaluate() throws Exception {
+                        WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+                        return (action1a.getRetries() > 0);
+                    }
+                });
+                waitFor(180 * 1000, new Predicate() {
+                    @Override
+                    public boolean evaluate() throws Exception {
+                        WorkflowActionBean action1a = jpaService.execute(wfActionGetCmd);
+                        return (action1a.getRetries() == 0);
+                    }
+                });
+                WorkflowActionBean action1b = jpaService.execute(wfActionGetCmd);
+                assertEquals(0, action1b.getRetries());
+                assertEquals("START_MANUAL", action1b.getStatusStr());
+
+                WorkflowJobBean job1 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+                assertEquals("SUSPENDED", job1.getStatusStr());
+
+                // At this point, the action has gotten a transient error, even after maxRetries tries so the workflow has been
+                // SUSPENDED
+            }
+        });
+        // Now, lets bring the job tracker back up and resume the workflow (which will restart the current action)
+        // It should now continue and finish with SUCCEEDED
+        new ResumeXCommand(jobId).call();
+        WorkflowJobBean job2 = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
+        assertEquals("RUNNING", job2.getStatusStr());
+
+        sleep(500);
+
+        new ActionCheckXCommand(actionId).call();
+        WorkflowActionBean action3 = jpaService.execute(wfActionGetCmd);
+        String launcherId = action3.getExternalId();
+
+        assertFalse(originalLauncherId.equals(launcherId));
+        assertFalse(originalMapperId.equals(launcherId));
+
+        final RunningJob launcherJob2 = jobClient.getJob(JobID.forName(launcherId));
+
+        waitFor(120 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return launcherJob2.isComplete();
+            }
+        });
+        assertTrue(launcherJob2.isSuccessful());
+        assertTrue(LauncherMapper.hasIdSwap(launcherJob2));
+
+        new ActionCheckXCommand(actionId).call();
+        WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
+        String mapperId = action4.getExternalId();
+        assertFalse(originalMapperId.equals(mapperId));
+
+        assertFalse(launcherId.equals(mapperId));
+
+        final RunningJob mrJob = jobClient.getJob(JobID.forName(mapperId));
+
+        waitFor(120 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return mrJob.isComplete();
+            }
+        });
+        assertTrue(mrJob.isSuccessful());
+
+        new ActionCheckXCommand(actionId).call();
+        WorkflowActionBean action5 = jpaService.execute(wfActionGetCmd);
+
+        assertEquals("SUCCEEDED", action5.getExternalStatus());
+    }
+
     @Override
     protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status) throws Exception {
         WorkflowActionBean action = createWorkflowActionSetPending(wfId, status);
@@ -281,6 +515,7 @@
         String actionXml = "<map-reduce>" +
         "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
         "<name-node>" + getNameNodeUri() + "</name-node>" +
+        "<prepare><delete path=\"" + outputDir.toString() + "\"/></prepare>" +
         "<configuration>" +
         "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() +
         "</value></property>" +
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 7bcff6b..fe8c867 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -805,5 +805,38 @@
         return jobConf;
     }
 
+    /**
+     * A 'closure' used by {@link XTestCase#executeWhileJobTrackerIsShutdown} method.
+     */
+    public static interface ShutdownJobTrackerExecutable {
+
+        /**
+         * Execute some code
+         *
+         * @throws Exception thrown if the executed code throws an exception.
+         */
+        public void execute() throws Exception;
+    }
+
+    /**
+     * Execute some code, expressed via a {@link ShutdownJobTrackerExecutable}, while the JobTracker is shutdown. Once the code has
+     * finished, the JobTracker is restarted (even if an exception occurs).
+     *
+     * @param executable The ShutdownJobTrackerExecutable to execute while the JobTracker is shutdown
+     */
+    protected void executeWhileJobTrackerIsShutdown(ShutdownJobTrackerExecutable executable) {
+        mrCluster.stopJobTracker();
+        Exception ex = null;
+        try {
+            executable.execute();
+        } catch (Exception e) {
+            ex = e;
+        } finally {
+            mrCluster.startJobTracker();
+        }
+        if (ex != null) {
+            throw new RuntimeException(ex);
+        }
+    }
 }
 
diff --git a/core/src/test/resources/graphWF.xml b/core/src/test/resources/graphWF.xml
index 91e3f2e..6a7b042 100644
--- a/core/src/test/resources/graphWF.xml
+++ b/core/src/test/resources/graphWF.xml
@@ -1,6 +1,4 @@
 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<<<<<<< HEAD
-=======
 <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
@@ -18,7 +16,6 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
->>>>>>> 028ab12... OOZIE-1013 Build failing as the license header comment is appearing before xml declaration in some files (virag)
 <workflow-app xmlns="uri:oozie:workflow:0.2" name="sm3-segment-3908-251483">
     <start to="or_0_1"/>
     <kill name="kill">
diff --git a/core/src/test/resources/invalidGraphWF.xml b/core/src/test/resources/invalidGraphWF.xml
index 10f35ee..8103bca 100644
--- a/core/src/test/resources/invalidGraphWF.xml
+++ b/core/src/test/resources/invalidGraphWF.xml
@@ -1,6 +1,4 @@
 <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<<<<<<< HEAD
-=======
 <!--
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
@@ -18,7 +16,6 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
->>>>>>> 028ab12... OOZIE-1013 Build failing as the license header comment is appearing before xml declaration in some files (virag)
 <workflow-app xmlns="uri:oozie:workflow:0.2" name="sm3-segment-3908-251483">
     <start to="or_0_1"/>
     <kill name="kill">
diff --git a/release-log.txt b/release-log.txt
index f558ebe..2e6cbca 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,7 @@
--- Oozie 3.3.0 release (unreleased)
+-- Oozie 3.3.0 release
 
+OOZIE-1005 Tests from OOZIE-994 use wrong condition in waitFor (rkanter via virag)
+OOZIE-994  ActionCheckXCommand does not handle failures properly(rkanter via mohammad)
 OOZIE-1058 ACL modify-job should not be hardcoded to group name(mona via mohammad)
 OOZIE-1052 HadoopAccessorService.createFileSystem throws exception in map-reduce action, failing workflow.(ryota via mohammad).
 OOZIE-1060 bump hadoop 2.X version to 2.0.2-alpha (rvs via tucu)