OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos)
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 387cbbe..d59d78f 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -56,7 +56,6 @@
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
-import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluationException;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.JobUtils;
@@ -96,6 +95,19 @@
         this.jobId = wfJob.getId();
     }
 
+    public ActionStartXCommand(String actionId, String type, String name) {
+        super(name, type, 0);
+        this.actionId = actionId;
+        this.jobId = Services.get().get(UUIDService.class).getId(actionId);
+    }
+
+    public ActionStartXCommand(WorkflowJobBean job, String actionId, String type, String name) {
+        super(name, type, 0);
+        this.actionId = actionId;
+        this.wfJob = job;
+        this.jobId = wfJob.getId();
+    }
+
     @Override
     protected void setLogInfo() {
         LogUtils.setLogInfo(actionId);
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
index 91da0b8..f5e27e9 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
@@ -25,16 +25,17 @@
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.XCommand;
-import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
 
 public class ForkedActionStartXCommand extends ActionStartXCommand {
 
+    private final static String FORKED_ACTION_START_NAME = "action.forkedstart";
+
     public ForkedActionStartXCommand(String actionId, String type) {
-        super(actionId, type);
+        super(actionId, type, FORKED_ACTION_START_NAME);
     }
 
     public ForkedActionStartXCommand(WorkflowJobBean wfJob, String id, String type) {
-        super(wfJob, id, type);
+        super(wfJob, id, type, FORKED_ACTION_START_NAME);
     }
 
     protected ActionExecutorContext execute() throws CommandException {
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
index 9ffe2d5..d640150 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
@@ -45,10 +45,8 @@
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.ConfigurationService;
@@ -62,8 +60,15 @@
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XCallable;
+import org.apache.oozie.service.RecoveryService;
 
-import javax.persistence.EntityManager;
+
 
 public class TestSignalXCommand extends XDataTestCase {
 
@@ -554,4 +559,153 @@
             }
         }
     }
+
+    /**
+     * for test {@link #testDeadlockForForkParallelSubmit()}
+     */
+    public static class TestRecoverForkStartActionCallableQueueService extends CallableQueueService{
+        private final XLog log = XLog.getLog(getClass());
+        public static TestSignalXCommand testSignalXCommand;
+
+        /**
+         *  Overwrite for test the same action's ActionStartXCommand in queue and the ForkedActionStartXCommand wouldn't lose,
+         *  if ActionStartXCommand and ForkedActionStartXCommand has the same name, ForkedActionStartXCommand couldn't enqueue
+         *  after a ActionStartXCommand in queue waiting for running.
+         */
+        public class CallableWrapper<E> extends CallableQueueService.CallableWrapper<E> {
+
+            boolean forkedActionStartXCommandFirstEnter;
+            public CallableWrapper(XCallable<E> callable, long delay) {
+                super(callable,delay);
+                forkedActionStartXCommandFirstEnter = callable instanceof ForkedActionStartXCommand;
+            }
+
+
+            public void run() {
+                XCallable<?> callable = getElement();
+                if (forkedActionStartXCommandFirstEnter && callable instanceof ForkedActionStartXCommand){
+                    // make sure there has a ActionStartXCommand in the queue wait to run,
+                    // and then ForkedActionStartXCommand enqueue
+                    testSignalXCommand.waitFor(15 * 1000, new Predicate() {
+                        @Override
+                        public boolean evaluate() throws Exception {
+                            return !filterDuplicates();
+                        }
+                    },200);
+
+                    log.warn("max concurrency for callable [{0}] exceeded, enqueueing with [{1}]ms delay", callable
+                            .getType(), CONCURRENCY_DELAY);
+                    setDelay(CONCURRENCY_DELAY, TimeUnit.MILLISECONDS);
+
+                    try {
+                        Method queue = CallableQueueService.class.getDeclaredMethod("queue",
+                                CallableQueueService.CallableWrapper.class, boolean.class);
+                        queue.setAccessible(true);
+                        queue.invoke(TestRecoverForkStartActionCallableQueueService.this,this,true);
+                    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+                        throw new RuntimeException(e);
+                    }
+                    forkedActionStartXCommandFirstEnter = false;
+                }else {
+                    super.run();
+                }
+            }
+        }
+
+        /**
+         * Replace CallableQueueService.CallableWrapper to TestRecoverForkStartActionCallableQueueService.CallableWrapper
+         * for text, in order to call TestRecoverForkStartActionCallableQueueService.CallableWrapper for wait
+         * ActionStartXCommand enqueue before.
+         *
+         */
+        public <T> Future<T> submit(CallableQueueService.CallableWrapper<T> task) throws InterruptedException {
+            return super.submit(new TestRecoverForkStartActionCallableQueueService.CallableWrapper<T>(task.getElement(),
+                    task.getInitialDelay()));
+        }
+    }
+
+
+    /**
+     * Test : fork parallel submit, the action has the same XCommand in queue, there will skip enqueue by
+     * {@link CallableQueueService.CallableWrapper#filterDuplicates()} so if the ActionStartXCommand  and
+     * ForkedActionStartXCommand has the same name, it would be lost.
+     *
+     * Note : RecoveryService will check the pending action and try to start it. So if the action's ForkedActionStartXCommand
+     * wait for run, there may be a ActionStartXCommand add for the same action.
+     *
+     */
+    public void testDeadlockForForkParallelSubmit() throws Exception {
+        setSystemProperty(Services.CONF_SERVICE_EXT_CLASSES, TestRecoverForkStartActionCallableQueueService.class.getName());
+        TestRecoverForkStartActionCallableQueueService.testSignalXCommand = this;
+
+        services = new Services();
+        Configuration servicesConf = services.getConf();
+        servicesConf.setInt(RecoveryService.CONF_WF_ACTIONS_OLDER_THAN, 0);
+        servicesConf.setInt(RecoveryService.CONF_SERVICE_INTERVAL, 10);
+        services.init();
+
+        ConfigurationService.setBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION, true);
+
+        Configuration conf = new XConfiguration();
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        //@formatter:off
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"wf-fork\">"
+                + "<start to=\"fork1\"/>"
+                + "<fork name=\"fork1\">"
+                + "<path start=\"action1\"/>"
+                + "<path start=\"action2\"/>"
+                + "<path start=\"action3\"/>"
+                + "<path start=\"action4\"/>"
+                + "<path start=\"action5\"/>"
+                + "</fork>"
+                + "<action name=\"action1\">"
+                + "<fs></fs>"
+                + "<ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action2\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action3\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action4\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<action name=\"action5\">"
+                + "<fs></fs><ok to=\"join1\"/>"
+                + "<error to=\"kill\"/>"
+                + "</action>"
+                + "<join name=\"join1\" to=\"end\"/>"
+                + "<kill name=\"kill\"><message>killed</message>"
+                + "</kill><"
+                + "end name=\"end\"/>"
+                + "</workflow-app>";
+        //@Formatter:on
+
+        writeToFile(appXml, workflowUri);
+        conf.set(OozieClient.APP_PATH, workflowUri);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        SubmitXCommand sc = new SubmitXCommand(conf);
+        final String jobId = sc.call();
+        new StartXCommand(jobId).call();
+
+        waitFor(30 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return WorkflowJobQueryExecutor.getInstance()
+                        .get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+                        .getStatus() == WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+
+        assertEquals(WorkflowJobQueryExecutor.getInstance()
+                        .get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, jobId)
+                        .getStatus(),
+                WorkflowJob.Status.SUCCEEDED);
+    }
 }
diff --git a/release-log.txt b/release-log.txt
index 5e3b886..7841def 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3717 When fork actions parallel submit, becasue ForkedActionStartXCommand and ActionStartXCommand has the same name, so ForkedActionStartXCommand would be lost, and cause deadlock (chenhd via dionusos)
 OOZIE-3715 Fix fork out more than one transitions submit , one transition submit fail can't execute KillXCommand (chenhd via dionusos)
 OOZIE-3716 Invocation of Main class completed Message is skipped when LauncherSecurityManager calls system exit (khr9603 via dionusos)
 OOZIE-3695 [sharelib-hive2] Fix current SpotBugs discovered issues in Oozie's sharelib-hive2 module (jmakai via dionusos)