[Fix-16942] Fix gloval master failover might cause master dead (#16953)

Co-authored-by: xiangzihao <460888207@qq.com>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
index c9b89ee..04c5954 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
@@ -28,6 +28,8 @@
 @SuperBuilder
 public abstract class BaseServerMetadata implements IClusters.IServerMetadata {
 
+    private final int processId;
+
     // The server startup time in milliseconds.
     private final long serverStartupTime;
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index c49833d..f68e13d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -33,6 +33,7 @@
 
     public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
         return MasterServerMetadata.builder()
+                .processId(masterHeartBeat.getProcessId())
                 .serverStartupTime(masterHeartBeat.getStartupTime())
                 .address(masterHeartBeat.getHost() + Constants.COLON + masterHeartBeat.getPort())
                 .cpuUsage(masterHeartBeat.getCpuUsage())
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
index d853c7d..c4d1967 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
@@ -41,6 +41,7 @@
 
     public static WorkerServerMetadata parseFromHeartBeat(final WorkerHeartBeat workerHeartBeat) {
         return WorkerServerMetadata.builder()
+                .processId(workerHeartBeat.getProcessId())
                 .serverStartupTime(workerHeartBeat.getStartupTime())
                 .address(workerHeartBeat.getHost() + Constants.COLON + workerHeartBeat.getPort())
                 .workerGroup(workerHeartBeat.getWorkerGroup())
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
index f2086a6..8e18477 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
@@ -29,6 +29,7 @@
 public class MasterFailoverEvent extends AbstractSystemEvent {
 
     private final MasterServerMetadata masterServerMetadata;
+    // The time when the event occurred. This might be different at different nodes.
     private final Date eventTime;
 
     private MasterFailoverEvent(final MasterServerMetadata masterServerMetadata,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
index 7bb2490..30a9ae7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
@@ -44,7 +44,6 @@
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.PlatformTransactionManager;
 
 @Slf4j
 @Component
@@ -66,9 +65,6 @@
     private WorkflowInstanceDao workflowInstanceDao;
 
     @Autowired
-    private PlatformTransactionManager platformTransactionManager;
-
-    @Autowired
     private WorkflowFailover workflowFailover;
 
     @Override
@@ -81,13 +77,21 @@
             final Optional<MasterServerMetadata> aliveMasterOptional =
                     clusterManager.getMasterClusters().getServer(masterAddress);
             if (aliveMasterOptional.isPresent()) {
+                // If the master is alive, then we use the alive master's startup time as the failover deadline.
                 final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
                 log.info("The master[{}] is alive, do global master failover on it", aliveMasterServerMetadata);
-                doMasterFailover(aliveMasterServerMetadata.getAddress(),
-                        aliveMasterServerMetadata.getServerStartupTime());
+                doMasterFailover(
+                        masterAddress,
+                        aliveMasterServerMetadata.getServerStartupTime(),
+                        RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
+                                masterAddress));
             } else {
+                // If the master is not alive, then we use the event time as the failover deadline.
                 log.info("The master[{}] is not alive, do global master failover on it", masterAddress);
-                doMasterFailover(masterAddress, globalMasterFailoverEvent.getEventTime().getTime());
+                doMasterFailover(
+                        masterAddress,
+                        globalMasterFailoverEvent.getEventTime().getTime(),
+                        RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(masterAddress));
             }
         }
 
@@ -99,53 +103,55 @@
     public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
         final MasterServerMetadata masterServerMetadata = masterFailoverEvent.getMasterServerMetadata();
         log.info("Master[{}] failover starting", masterServerMetadata);
+        final String masterAddress = masterServerMetadata.getAddress();
 
         final Optional<MasterServerMetadata> aliveMasterOptional =
-                clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
+                clusterManager.getMasterClusters().getServer(masterAddress);
         if (aliveMasterOptional.isPresent()) {
             final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
             if (aliveMasterServerMetadata.getServerStartupTime() == masterServerMetadata.getServerStartupTime()) {
                 log.info("The master[{}] is alive, maybe it reconnect to registry skip failover", masterServerMetadata);
-            } else {
-                log.info("The master[{}] is alive, but the startup time is different, will failover on {}",
-                        masterServerMetadata,
-                        aliveMasterServerMetadata);
-                doMasterFailover(aliveMasterServerMetadata.getAddress(),
-                        aliveMasterServerMetadata.getServerStartupTime());
+                return;
             }
-        } else {
-            log.info("The master[{}] is not alive, will failover", masterServerMetadata);
-            doMasterFailover(masterServerMetadata.getAddress(), masterServerMetadata.getServerStartupTime());
         }
+        doMasterFailover(
+                masterServerMetadata.getAddress(),
+                masterFailoverEvent.getEventTime().getTime(),
+                RegistryUtils.getFailoveredNodePath(
+                        masterServerMetadata.getAddress(),
+                        masterServerMetadata.getServerStartupTime(),
+                        masterServerMetadata.getProcessId()));
     }
 
     /**
      * Do master failover.
      * <p> Will failover the workflow which is scheduled by the master and the workflow's fire time is before the maxWorkflowFireTime.
      */
-    private void doMasterFailover(final String masterAddress, final long masterStartupTime) {
+    private void doMasterFailover(final String masterAddress,
+                                  final long workflowFailoverDeadline,
+                                  final String masterFailoverNodePath) {
         // We use lock to avoid multiple master failover at the same time.
         // Once the workflow has been failovered, then it's state will be changed to FAILOVER
         // Once the FAILOVER workflow has been refired, then it's host will be changed to the new master and have a new
         // start time.
         // So if a master has been failovered multiple times, there is no problem.
         final StopWatch failoverTimeCost = StopWatch.createStarted();
-        registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
+        registryClient.getLock(RegistryUtils.getMasterFailoverLockPath(masterAddress));
         try {
-            final String failoverFinishedNodePath =
-                    RegistryUtils.getFailoverFinishedNodePath(masterAddress, masterStartupTime);
-            if (registryClient.exists(failoverFinishedNodePath)) {
-                log.error("The master[{}-{}] is exist at: {}, means it has already been failovered, skip failover",
+            // If the master has already been failovered, then we skip the failover.
+            if (registryClient.exists(masterFailoverNodePath)
+                    && String.valueOf(workflowFailoverDeadline).equals(registryClient.get(masterFailoverNodePath))) {
+                log.error("The master[{}/{}] is exist at: {}, means it has already been failovered, skip failover",
                         masterAddress,
-                        masterStartupTime,
-                        failoverFinishedNodePath);
+                        workflowFailoverDeadline,
+                        masterFailoverNodePath);
                 return;
             }
             final List<WorkflowInstance> needFailoverWorkflows =
-                    getFailoverWorkflowsForMaster(masterAddress, new Date(masterStartupTime));
+                    getFailoverWorkflowsForMaster(masterAddress, new Date(workflowFailoverDeadline));
             needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
+            registryClient.persist(masterFailoverNodePath, String.valueOf(workflowFailoverDeadline));
             failoverTimeCost.stop();
-            registryClient.persist(failoverFinishedNodePath, String.valueOf(System.currentTimeMillis()));
             log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
                     masterAddress,
                     needFailoverWorkflows.size(),
@@ -190,28 +196,30 @@
             final WorkerServerMetadata aliveWorkerServerMetadata = aliveWorkerOptional.get();
             if (aliveWorkerServerMetadata.getServerStartupTime() == workerServerMetadata.getServerStartupTime()) {
                 log.info("The worker[{}] is alive, maybe it reconnect to registry skip failover", workerServerMetadata);
-            } else {
-                log.info("The worker[{}] is alive, but the startup time is different, will failover on {}",
-                        workerServerMetadata,
-                        aliveWorkerServerMetadata);
-                doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
-                        aliveWorkerServerMetadata.getServerStartupTime());
+                return;
             }
-        } else {
-            log.info("The worker[{}] is not alive, will failover", workerServerMetadata);
-            doWorkerFailover(workerServerMetadata.getAddress(), workerServerMetadata.getServerStartupTime());
         }
+        doWorkerFailover(
+                workerServerMetadata.getAddress(),
+                System.currentTimeMillis(),
+                RegistryUtils.getFailoveredNodePath(
+                        workerServerMetadata.getAddress(),
+                        workerServerMetadata.getServerStartupTime(),
+                        workerServerMetadata.getProcessId()));
     }
 
-    private void doWorkerFailover(final String workerAddress, final long workerCrashTime) {
+    private void doWorkerFailover(final String workerAddress,
+                                  final long taskFailoverDeadline,
+                                  final String workerFailoverNodePath) {
         final StopWatch failoverTimeCost = StopWatch.createStarted();
+        // we don't check the workerFailoverNodePath exist, since the worker may be failovered multiple master
 
         final List<ITaskExecutionRunnable> needFailoverTasks =
-                getFailoverTaskForWorker(workerAddress, new Date(workerCrashTime));
+                getFailoverTaskForWorker(workerAddress, new Date(taskFailoverDeadline));
         needFailoverTasks.forEach(taskFailover::failoverTask);
 
         registryClient.persist(
-                RegistryUtils.getFailoverFinishedNodePath(workerAddress, workerCrashTime),
+                workerFailoverNodePath,
                 String.valueOf(System.currentTimeMillis()));
         failoverTimeCost.stop();
         log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
@@ -221,7 +229,7 @@
     }
 
     private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String workerAddress,
-                                                                  final Date workerCrashTime) {
+                                                                  final Date taskFailoverDeadline) {
         return workflowRepository.getAll()
                 .stream()
                 .map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)
@@ -237,7 +245,7 @@
                     // The submitTime should not be null.
                     // This is a bad case unless someone manually set the submitTime to null.
                     final Date submitTime = taskExecutionRunnable.getTaskInstance().getSubmitTime();
-                    return submitTime != null && submitTime.before(workerCrashTime);
+                    return submitTime != null && submitTime.before(taskFailoverDeadline);
                 })
                 .collect(Collectors.toList());
     }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index cf33c48..59174ba 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -86,7 +86,7 @@
 
     @Override
     public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
-        final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
+        final String failoverNodePath = RegistryUtils.getFailoveredNodePath(masterHeartBeat);
         if (registryClient.exists(failoverNodePath)) {
             log.warn("The master: {} is under {}, means it has been failover will close myself",
                     masterHeartBeat,
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index 75deb8a..c026231 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -26,6 +26,7 @@
 
     FAILOVER_FINISH_NODES("FailoverFinishNodes", "/nodes/failover-finish-nodes"),
 
+    GLOBAL_MASTER_FAILOVER_LOCK("GlobalMasterFailoverLock", "/lock/global-master-failover"),
     MASTER("Master", "/nodes/master"),
     MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
     MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
index 25ef976..7edcce3 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
@@ -20,14 +20,30 @@
 import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
 import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
 
+import com.google.common.base.Preconditions;
+
 public class RegistryUtils {
 
-    public static String getFailoverFinishedNodePath(final BaseHeartBeat baseHeartBeat) {
-        return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
-                baseHeartBeat.getStartupTime());
+    public static String getMasterFailoverLockPath(final String masterAddress) {
+        Preconditions.checkNotNull(masterAddress, "master address cannot be null");
+        return RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + masterAddress;
     }
 
-    public static String getFailoverFinishedNodePath(final String masterAddress, final long masterStartupTime) {
-        return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + masterAddress + "-" + masterStartupTime;
+    public static String getFailoveredNodePathWhichStartupTimeIsUnknown(final String serverAddress) {
+        return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + "unknown" + "-"
+                + "unknown";
+    }
+
+    public static String getFailoveredNodePath(final BaseHeartBeat baseHeartBeat) {
+        return getFailoveredNodePath(
+                baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
+                baseHeartBeat.getStartupTime(),
+                baseHeartBeat.getProcessId());
+    }
+
+    public static String getFailoveredNodePath(final String serverAddress, final long serverStartupTime,
+                                               final int processId) {
+        return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + serverStartupTime
+                + "-" + processId;
     }
 }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 739cf95..01ed2f5 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -85,7 +85,7 @@
 
     @Override
     public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
-        final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
+        final String failoverNodePath = RegistryUtils.getFailoveredNodePath(workerHeartBeat);
         if (registryClient.exists(failoverNodePath)) {
             log.warn("The worker: {} is under {}, means it has been failover will close myself",
                     workerHeartBeat,