[Fix-16942] Fix gloval master failover might cause master dead (#16953)
Co-authored-by: xiangzihao <460888207@qq.com>
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
index c9b89ee..04c5954 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/BaseServerMetadata.java
@@ -28,6 +28,8 @@
@SuperBuilder
public abstract class BaseServerMetadata implements IClusters.IServerMetadata {
+ private final int processId;
+
// The server startup time in milliseconds.
private final long serverStartupTime;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index c49833d..f68e13d 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -33,6 +33,7 @@
public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
return MasterServerMetadata.builder()
+ .processId(masterHeartBeat.getProcessId())
.serverStartupTime(masterHeartBeat.getStartupTime())
.address(masterHeartBeat.getHost() + Constants.COLON + masterHeartBeat.getPort())
.cpuUsage(masterHeartBeat.getCpuUsage())
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
index d853c7d..c4d1967 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerServerMetadata.java
@@ -41,6 +41,7 @@
public static WorkerServerMetadata parseFromHeartBeat(final WorkerHeartBeat workerHeartBeat) {
return WorkerServerMetadata.builder()
+ .processId(workerHeartBeat.getProcessId())
.serverStartupTime(workerHeartBeat.getStartupTime())
.address(workerHeartBeat.getHost() + Constants.COLON + workerHeartBeat.getPort())
.workerGroup(workerHeartBeat.getWorkerGroup())
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
index f2086a6..8e18477 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/system/event/MasterFailoverEvent.java
@@ -29,6 +29,7 @@
public class MasterFailoverEvent extends AbstractSystemEvent {
private final MasterServerMetadata masterServerMetadata;
+ // The time when the event occurred. This might be different at different nodes.
private final Date eventTime;
private MasterFailoverEvent(final MasterServerMetadata masterServerMetadata,
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
index 7bb2490..30a9ae7 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/FailoverCoordinator.java
@@ -44,7 +44,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import org.springframework.transaction.PlatformTransactionManager;
@Slf4j
@Component
@@ -66,9 +65,6 @@
private WorkflowInstanceDao workflowInstanceDao;
@Autowired
- private PlatformTransactionManager platformTransactionManager;
-
- @Autowired
private WorkflowFailover workflowFailover;
@Override
@@ -81,13 +77,21 @@
final Optional<MasterServerMetadata> aliveMasterOptional =
clusterManager.getMasterClusters().getServer(masterAddress);
if (aliveMasterOptional.isPresent()) {
+ // If the master is alive, then we use the alive master's startup time as the failover deadline.
final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
log.info("The master[{}] is alive, do global master failover on it", aliveMasterServerMetadata);
- doMasterFailover(aliveMasterServerMetadata.getAddress(),
- aliveMasterServerMetadata.getServerStartupTime());
+ doMasterFailover(
+ masterAddress,
+ aliveMasterServerMetadata.getServerStartupTime(),
+ RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
+ masterAddress));
} else {
+ // If the master is not alive, then we use the event time as the failover deadline.
log.info("The master[{}] is not alive, do global master failover on it", masterAddress);
- doMasterFailover(masterAddress, globalMasterFailoverEvent.getEventTime().getTime());
+ doMasterFailover(
+ masterAddress,
+ globalMasterFailoverEvent.getEventTime().getTime(),
+ RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(masterAddress));
}
}
@@ -99,53 +103,55 @@
public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
final MasterServerMetadata masterServerMetadata = masterFailoverEvent.getMasterServerMetadata();
log.info("Master[{}] failover starting", masterServerMetadata);
+ final String masterAddress = masterServerMetadata.getAddress();
final Optional<MasterServerMetadata> aliveMasterOptional =
- clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
+ clusterManager.getMasterClusters().getServer(masterAddress);
if (aliveMasterOptional.isPresent()) {
final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
if (aliveMasterServerMetadata.getServerStartupTime() == masterServerMetadata.getServerStartupTime()) {
log.info("The master[{}] is alive, maybe it reconnect to registry skip failover", masterServerMetadata);
- } else {
- log.info("The master[{}] is alive, but the startup time is different, will failover on {}",
- masterServerMetadata,
- aliveMasterServerMetadata);
- doMasterFailover(aliveMasterServerMetadata.getAddress(),
- aliveMasterServerMetadata.getServerStartupTime());
+ return;
}
- } else {
- log.info("The master[{}] is not alive, will failover", masterServerMetadata);
- doMasterFailover(masterServerMetadata.getAddress(), masterServerMetadata.getServerStartupTime());
}
+ doMasterFailover(
+ masterServerMetadata.getAddress(),
+ masterFailoverEvent.getEventTime().getTime(),
+ RegistryUtils.getFailoveredNodePath(
+ masterServerMetadata.getAddress(),
+ masterServerMetadata.getServerStartupTime(),
+ masterServerMetadata.getProcessId()));
}
/**
* Do master failover.
* <p> Will failover the workflow which is scheduled by the master and the workflow's fire time is before the maxWorkflowFireTime.
*/
- private void doMasterFailover(final String masterAddress, final long masterStartupTime) {
+ private void doMasterFailover(final String masterAddress,
+ final long workflowFailoverDeadline,
+ final String masterFailoverNodePath) {
// We use lock to avoid multiple master failover at the same time.
// Once the workflow has been failovered, then it's state will be changed to FAILOVER
// Once the FAILOVER workflow has been refired, then it's host will be changed to the new master and have a new
// start time.
// So if a master has been failovered multiple times, there is no problem.
final StopWatch failoverTimeCost = StopWatch.createStarted();
- registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
+ registryClient.getLock(RegistryUtils.getMasterFailoverLockPath(masterAddress));
try {
- final String failoverFinishedNodePath =
- RegistryUtils.getFailoverFinishedNodePath(masterAddress, masterStartupTime);
- if (registryClient.exists(failoverFinishedNodePath)) {
- log.error("The master[{}-{}] is exist at: {}, means it has already been failovered, skip failover",
+ // If the master has already been failovered, then we skip the failover.
+ if (registryClient.exists(masterFailoverNodePath)
+ && String.valueOf(workflowFailoverDeadline).equals(registryClient.get(masterFailoverNodePath))) {
+ log.error("The master[{}/{}] is exist at: {}, means it has already been failovered, skip failover",
masterAddress,
- masterStartupTime,
- failoverFinishedNodePath);
+ workflowFailoverDeadline,
+ masterFailoverNodePath);
return;
}
final List<WorkflowInstance> needFailoverWorkflows =
- getFailoverWorkflowsForMaster(masterAddress, new Date(masterStartupTime));
+ getFailoverWorkflowsForMaster(masterAddress, new Date(workflowFailoverDeadline));
needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
+ registryClient.persist(masterFailoverNodePath, String.valueOf(workflowFailoverDeadline));
failoverTimeCost.stop();
- registryClient.persist(failoverFinishedNodePath, String.valueOf(System.currentTimeMillis()));
log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
masterAddress,
needFailoverWorkflows.size(),
@@ -190,28 +196,30 @@
final WorkerServerMetadata aliveWorkerServerMetadata = aliveWorkerOptional.get();
if (aliveWorkerServerMetadata.getServerStartupTime() == workerServerMetadata.getServerStartupTime()) {
log.info("The worker[{}] is alive, maybe it reconnect to registry skip failover", workerServerMetadata);
- } else {
- log.info("The worker[{}] is alive, but the startup time is different, will failover on {}",
- workerServerMetadata,
- aliveWorkerServerMetadata);
- doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
- aliveWorkerServerMetadata.getServerStartupTime());
+ return;
}
- } else {
- log.info("The worker[{}] is not alive, will failover", workerServerMetadata);
- doWorkerFailover(workerServerMetadata.getAddress(), workerServerMetadata.getServerStartupTime());
}
+ doWorkerFailover(
+ workerServerMetadata.getAddress(),
+ System.currentTimeMillis(),
+ RegistryUtils.getFailoveredNodePath(
+ workerServerMetadata.getAddress(),
+ workerServerMetadata.getServerStartupTime(),
+ workerServerMetadata.getProcessId()));
}
- private void doWorkerFailover(final String workerAddress, final long workerCrashTime) {
+ private void doWorkerFailover(final String workerAddress,
+ final long taskFailoverDeadline,
+ final String workerFailoverNodePath) {
final StopWatch failoverTimeCost = StopWatch.createStarted();
+ // we don't check the workerFailoverNodePath exist, since the worker may be failovered multiple master
final List<ITaskExecutionRunnable> needFailoverTasks =
- getFailoverTaskForWorker(workerAddress, new Date(workerCrashTime));
+ getFailoverTaskForWorker(workerAddress, new Date(taskFailoverDeadline));
needFailoverTasks.forEach(taskFailover::failoverTask);
registryClient.persist(
- RegistryUtils.getFailoverFinishedNodePath(workerAddress, workerCrashTime),
+ workerFailoverNodePath,
String.valueOf(System.currentTimeMillis()));
failoverTimeCost.stop();
log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
@@ -221,7 +229,7 @@
}
private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String workerAddress,
- final Date workerCrashTime) {
+ final Date taskFailoverDeadline) {
return workflowRepository.getAll()
.stream()
.map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)
@@ -237,7 +245,7 @@
// The submitTime should not be null.
// This is a bad case unless someone manually set the submitTime to null.
final Date submitTime = taskExecutionRunnable.getTaskInstance().getSubmitTime();
- return submitTime != null && submitTime.before(workerCrashTime);
+ return submitTime != null && submitTime.before(taskFailoverDeadline);
})
.collect(Collectors.toList());
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
index cf33c48..59174ba 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterHeartBeatTask.java
@@ -86,7 +86,7 @@
@Override
public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
- final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
+ final String failoverNodePath = RegistryUtils.getFailoveredNodePath(masterHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The master: {} is under {}, means it has been failover will close myself",
masterHeartBeat,
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
index 75deb8a..c026231 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java
@@ -26,6 +26,7 @@
FAILOVER_FINISH_NODES("FailoverFinishNodes", "/nodes/failover-finish-nodes"),
+ GLOBAL_MASTER_FAILOVER_LOCK("GlobalMasterFailoverLock", "/lock/global-master-failover"),
MASTER("Master", "/nodes/master"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
index 25ef976..7edcce3 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/utils/RegistryUtils.java
@@ -20,14 +20,30 @@
import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+import com.google.common.base.Preconditions;
+
public class RegistryUtils {
- public static String getFailoverFinishedNodePath(final BaseHeartBeat baseHeartBeat) {
- return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
- baseHeartBeat.getStartupTime());
+ public static String getMasterFailoverLockPath(final String masterAddress) {
+ Preconditions.checkNotNull(masterAddress, "master address cannot be null");
+ return RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + masterAddress;
}
- public static String getFailoverFinishedNodePath(final String masterAddress, final long masterStartupTime) {
- return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + masterAddress + "-" + masterStartupTime;
+ public static String getFailoveredNodePathWhichStartupTimeIsUnknown(final String serverAddress) {
+ return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + "unknown" + "-"
+ + "unknown";
+ }
+
+ public static String getFailoveredNodePath(final BaseHeartBeat baseHeartBeat) {
+ return getFailoveredNodePath(
+ baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
+ baseHeartBeat.getStartupTime(),
+ baseHeartBeat.getProcessId());
+ }
+
+ public static String getFailoveredNodePath(final String serverAddress, final long serverStartupTime,
+ final int processId) {
+ return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + serverStartupTime
+ + "-" + processId;
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 739cf95..01ed2f5 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -85,7 +85,7 @@
@Override
public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
- final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
+ final String failoverNodePath = RegistryUtils.getFailoveredNodePath(workerHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The worker: {} is under {}, means it has been failover will close myself",
workerHeartBeat,