OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos)
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 387cbbe..d59d78f 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -56,7 +56,6 @@
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
-import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluationException;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.JobUtils;
@@ -96,6 +95,19 @@
this.jobId = wfJob.getId();
}
+ public ActionStartXCommand(String actionId, String type, String name) {
+ super(name, type, 0);
+ this.actionId = actionId;
+ this.jobId = Services.get().get(UUIDService.class).getId(actionId);
+ }
+
+ public ActionStartXCommand(WorkflowJobBean job, String actionId, String type, String name) {
+ super(name, type, 0);
+ this.actionId = actionId;
+ this.wfJob = job;
+ this.jobId = wfJob.getId();
+ }
+
@Override
protected void setLogInfo() {
LogUtils.setLogInfo(actionId);
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
index 91da0b8..f5e27e9 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
@@ -25,16 +25,17 @@
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.XCommand;
-import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
public class ForkedActionStartXCommand extends ActionStartXCommand {
+ private final static String FORKED_ACTION_START_NAME = "action.forkedstart";
+
public ForkedActionStartXCommand(String actionId, String type) {
- super(actionId, type);
+ super(actionId, type, FORKED_ACTION_START_NAME);
}
public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String type) {
- super(wfJob, id, type);
+ super(wfJob, id, type, FORKED_ACTION_START_NAME);
}
protected ActionExecutorContext execute() throws CommandException {
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index 9ffe2d5..d640150 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -45,10 +45,8 @@
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
@@ -62,8 +60,15 @@
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XCallable;
+import org.apache.oozie.service.RecoveryService;
-import javax.persistence.EntityManager;
+
public class TestSignalXCommand extends XDataTestCase {
@@ -554,4 +559,153 @@
}
}
}
+
+ /**
+ * for test {@link #testDeadlockForForkParallelSubmit()}
+ */
+ public static class TestRecoverForkStartActionCallableQueueService extends CallableQueueService{
+ private final XLog log = XLog.getLog(getClass());
+ public static TestSignalXCommand testSignalXCommand;
+
+ /**
+ * Overwrite for test the same action's ActionStartXCommand in queue and the ForkedActionStartXCommand wouldn't lose,
+ * if ActionStartXCommand and ForkedActionStartXCommand has the same name, ForkedActionStartXCommand couldn't enqueue
+ * after a ActionStartXCommand in queue waiting for running.
+ */
+ public class CallableWrapper<E> extends CallableQueueService.CallableWrapper<E> {
+
+ boolean forkedActionStartXCommandFirstEnter;
+ public CallableWrapper(XCallable<E> callable, long delay) {
+ super(callable,delay);
+ forkedActionStartXCommandFirstEnter = callable instanceof ForkedActionStartXCommand;
+ }
+
+
+ public void run() {
+ XCallable<?> callable = getElement();
+ if (forkedActionStartXCommandFirstEnter && callable instanceof ForkedActionStartXCommand){
+ // make sure there has a ActionStartXCommand in the queue wait to run,
+ // and then ForkedActionStartXCommand enqueue
+ testSignalXCommand.waitFor(15 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return !filterDuplicates();
+ }
+ },200);
+
+ log.warn("max concurrency for callable [{0}] exceeded, enqueueing with [{1}]ms delay", callable
+ .getType(), CONCURRENCY_DELAY);
+ setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
+
+ try {
+ Method queue = CallableQueueService.class.getDeclaredMethod("queue",
+ CallableQueueService.CallableWrapper.class, boolean.class);
+ queue.setAccessible(true);
+ queue.invoke(TestRecoverForkStartActionCallableQueueService.this,this,true);
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ forkedActionStartXCommandFirstEnter = false;
+ }else {
+ super.run();
+ }
+ }
+ }
+
+ /**
+ * Replace CallableQueueService.CallableWrapper to TestRecoverForkStartActionCallableQueueService.CallableWrapper
+ * for text, in order to call TestRecoverForkStartActionCallableQueueService.CallableWrapper for wait
+ * ActionStartXCommand enqueue before.
+ *
+ */
+ public <T> Future<T> submit(CallableQueueService.CallableWrapper<T> task) throws InterruptedException {
+ return super.submit(new TestRecoverForkStartActionCallableQueueService.CallableWrapper<T>(task.getElement(),
+ task.getInitialDelay()));
+ }
+ }
+
+
+ /**
+ * Test : fork parallel submit, the action has the same XCommand in queue, there will skip enqueue by
+ * {@link CallableQueueService.CallableWrapper#filterDuplicates()} so if the ActionStartXCommand and
+ * ForkedActionStartXCommand has the same name, it would be lost.
+ *
+ * Note : RecoveryService will check the pending action and try to start it. So if the action's ForkedActionStartXCommand
+ * wait for run, there may be a ActionStartXCommand add for the same action.
+ *
+ */
+ public void testDeadlockForForkParallelSubmit() throws Exception {
+ setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, TestRecoverForkStartActionCallableQueueService.class.getName());
+ TestRecoverForkStartActionCallableQueueService.testSignalXCommand = this;
+
+ services = new Services();
+ Configuration servicesConf = services.getConf();
+ servicesConf.setInt(RecoveryService.CONF_WF_ACTIONS_OLDER_THAN, 0);
+ servicesConf.setInt(RecoveryService.CONF_SERVICE_INTERVAL, 10);
+ services.init();
+
+ ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+
+ Configuration conf = new XConfiguration();
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ //@formatter:off
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"wf-fork\">"
+ + "<start to=\"fork1\"/>"
+ + "<fork name=\"fork1\">"
+ + "<path start=\"action1\"/>"
+ + "<path start=\"action2\"/>"
+ + "<path start=\"action3\"/>"
+ + "<path start=\"action4\"/>"
+ + "<path start=\"action5\"/>"
+ + "</fork>"
+ + "<action name=\"action1\">"
+ + "<fs></fs>"
+ + "<ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action2\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action3\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action4\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<action name=\"action5\">"
+ + "<fs></fs><ok to=\"join1\"/>"
+ + "<error to=\"kill\"/>"
+ + "</action>"
+ + "<join name=\"join1\" to=\"end\"/>"
+ + "<kill name=\"kill\"><message>killed</message>"
+ + "</kill><"
+ + "end name=\"end\"/>"
+ + "</workflow-app>";
+ //@Formatter:on
+
+ writeToFile(appXml, workflowUri);
+ conf.set(OozieClient.APP_PATH, workflowUri);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ final String jobId = sc.call();
+ new StartXCommand(jobId).call();
+
+ waitFor(30 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return WorkflowJobQueryExecutor.getInstance()
+ .get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+ .getStatus() == WorkflowJob.Status.SUCCEEDED;
+ }
+ });
+
+ assertEquals(WorkflowJobQueryExecutor.getInstance()
+ .get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+ .getStatus(),
+ WorkflowJob.Status.SUCCEEDED);
+ }
}
diff --git a/release-log.txt b/release-log.txt
index 5e3b886..7841def 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos)
OOZIE-3715 Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand (chenhd via dionusos)
OOZIE-3716 Invocation of Main class completed Message is skipped when LauncherSecurityManager calls system exit (khr9603 via dionusos)
OOZIE-3695 [sharelib-hive2] Fix current SpotBugs discovered issues in Oozie's sharelib-hive2 module (jmakai via dionusos)