fix cluster fo (#450)

diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java
index 5a9b352..2bb6ed3 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java
@@ -22,7 +22,11 @@
         super(MESSAGE);
     }
 
-    public GeaflowHeartbeatException(Throwable cause) {
+    public GeaflowHeartbeatException(String message) {
+        super(message);
+    }
+
+    public GeaflowHeartbeatException(String message, Throwable cause) {
         super(MESSAGE, cause);
     }
 }
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java
index 22cfe06..3ebe8a3 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java
@@ -20,7 +20,9 @@
 import java.lang.management.RuntimeMXBean;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
+import java.util.List;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,14 +114,23 @@
         return pid;
     }
 
-    public static synchronized void killProcess(int pid) {
-        LOGGER.info("Kill -9 {}", pid);
+    public static void killProcess(int pid) {
+        execute("kill -9 " + pid);
+    }
+
+    public static void killProcesses(List<Integer> pidList) {
+        String pids = StringUtils.join(pidList, " ");
+        execute("kill -9 " + pids);
+    }
+
+    public static void execute(String cmd) {
+        LOGGER.info(cmd);
         try {
-            Process process = Runtime.getRuntime().exec("kill -9 " + pid);
+            Process process = Runtime.getRuntime().exec(cmd);
             process.waitFor();
         } catch (InterruptedException | IOException e) {
-            LOGGER.error("Kill {} failed: {}", pid, e.getMessage());
-            throw new GeaflowRuntimeException(e);
+            LOGGER.error(" {} failed: {}", cmd, e);
+            throw new GeaflowRuntimeException(e.getMessage(), e);
         }
     }
 
diff --git a/geaflow/geaflow-common/src/test/java/com/antgroup/geaflow/common/utils/ProcessUtilTest.java b/geaflow/geaflow-common/src/test/java/com/antgroup/geaflow/common/utils/ProcessUtilTest.java
new file mode 100644
index 0000000..fda14f5
--- /dev/null
+++ b/geaflow/geaflow-common/src/test/java/com/antgroup/geaflow/common/utils/ProcessUtilTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2023 AntGroup CO., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package com.antgroup.geaflow.common.utils;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertThrows;
+
+import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
+import java.io.IOException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ProcessUtilTest {
+
+    @Mock
+    private Runtime runtime;
+
+    @Mock
+    private Process process;
+
+    @BeforeMethod
+    public void setUp() throws IOException {
+        MockitoAnnotations.initMocks(this);
+        when(runtime.exec(anyString())).thenReturn(process);
+    }
+
+    @Test
+    public void execute_CommandThrowsIOException_ExceptionHandled() throws IOException, InterruptedException {
+        String cmd = "some command";
+        when(runtime.exec(anyString())).thenThrow(new IOException("IO error"));
+
+        assertThrows(GeaflowRuntimeException.class, () -> ProcessUtil.execute(cmd));
+    }
+
+    @Test
+    public void execute_CommandThrowsInterruptedException_ExceptionHandled() throws IOException, InterruptedException {
+        String cmd = "some command";
+        when(runtime.exec(anyString())).thenReturn(process);
+        doThrow(new InterruptedException("Interrupted")).when(process).waitFor();
+
+        assertThrows(GeaflowRuntimeException.class, () -> ProcessUtil.execute(cmd));
+    }
+}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java
index 0f97888..816684a 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java
@@ -102,19 +102,21 @@
         ClusterMetaStore metaStore = ClusterMetaStore.getInstance(id, name, config);
         Map<Integer, String> drivers = metaStore.getDriverIds();
         Map<Integer, String> containerIds = metaStore.getContainerIds();
-        if (drivers != null && !drivers.isEmpty() && containerIds != null && !containerIds
-            .isEmpty()) {
+        int driverNum = drivers == null ? 0 : drivers.size();
+        int containerNum = containerIds == null ? 0 : containerIds.size();
+        if (driverNum != 0 && containerNum != 0) {
             this.isRecover = true;
             this.driverIds = drivers;
             this.containerIds = containerIds;
             this.maxComponentId = metaStore.getMaxContainerId();
-            LOGGER.info("recover {} containers and {} drivers maxComponentId {} from metaStore",
-                containerIds.size(), drivers.size(), maxComponentId);
+            LOGGER.info("recover {} containers and {} drivers maxComponentId: {}",
+                containerNum, driverNum, maxComponentId);
         } else {
             this.isRecover = false;
             this.driverIds = new ConcurrentHashMap<>();
             this.containerIds = new ConcurrentHashMap<>();
             this.maxComponentId = 0;
+            LOGGER.info("init with maxComponentId: {}", maxComponentId);
         }
     }
 
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java
index e5e5d64..3bae59e 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java
@@ -158,9 +158,12 @@
             Thread.currentThread().setName(this.name);
             try {
                 this.fetch();
+            } catch (GeaflowRuntimeException e) {
+                LOGGER.error("fetcher task err with window id {} {}", this.targetWindowId, this.name, e);
+                throw e;
             } catch (Throwable e) {
                 LOGGER.error("fetcher task err with window id {} {}", this.targetWindowId, this.name, e);
-                throw new GeaflowRuntimeException(e);
+                throw new GeaflowRuntimeException(e.getMessage(), e);
             }
         }
 
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java
index 30eb46d..ce77c4c 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java
@@ -63,9 +63,8 @@
     private final ScheduledFuture<?> reportFuture;
     private final ScheduledExecutorService checkTimeoutService;
     private final ScheduledExecutorService heartbeatReportService;
-    private final GeaflowHeartbeatException timeoutException;
     private ScheduledFuture<?> checkFuture;
-    private IStatsWriter statsWriter;
+    private final IStatsWriter statsWriter;
 
     public HeartbeatManager(Configuration config, IClusterManager clusterManager) {
         this.senderMap = new ConcurrentHashMap<>();
@@ -86,6 +85,7 @@
             this.checkFuture = checkTimeoutService.scheduleAtFixedRate(this::checkSupervisorHealth,
                 heartbeatCheckMs, heartbeatCheckMs, TimeUnit.MILLISECONDS);
         }
+
         this.heartbeatReportService = new ScheduledThreadPoolExecutor(1,
             ThreadUtil.namedThreadFactory(true, "heartbeat-report"));
         this.reportFuture = heartbeatReportService
@@ -93,7 +93,6 @@
                 TimeUnit.MILLISECONDS);
 
         this.clusterManager = (AbstractClusterManager) clusterManager;
-        this.timeoutException = new GeaflowHeartbeatException();
         this.statsWriter = StatsCollectorFactory.init(config).getStatsWriter();
     }
 
@@ -107,7 +106,7 @@
         this.statsWriter.addMetric(masterInfo.getName(), masterInfo);
     }
 
-    private void checkHeartBeat() {
+    void checkHeartBeat() {
         try {
             long checkTime = System.currentTimeMillis();
             checkTimeout(clusterManager.getContainerIds(), checkTime);
@@ -128,8 +127,9 @@
                     LOGGER.warn("{} is not registered", entry.getValue());
                 }
             } else if (checkTime > heartbeat.getTimestamp() + heartbeatTimeoutMs) {
-                LOGGER.error("{} heartbeat is missing", entry.getValue());
-                clusterManager.doFailover(componentId, timeoutException);
+                String message = String.format("%s heartbeat is lost", entry.getValue());
+                LOGGER.error(message);
+                doFailover(componentId, new GeaflowHeartbeatException(message));
             }
         }
     }
@@ -147,7 +147,7 @@
             checkSupervisorHealth(clusterManager.getContainerIds());
             checkSupervisorHealth(clusterManager.getDriverIds());
         } catch (Throwable e) {
-            LOGGER.warn("Check container healthy error", e);
+            LOGGER.warn("Check container healthy error: {}", e.getMessage(), e);
         }
     }
 
@@ -157,17 +157,23 @@
             try {
                 StatusResponse response = RpcClient.getInstance().querySupervisorStatus(name);
                 if (!response.getIsAlive()) {
-                    LOGGER.info("Found {} is not alive and do failover", name);
-                    clusterManager.doFailover(entry.getKey(), timeoutException);
+                    String message = String.format("supervisor of %s is not alive", name);
+                    LOGGER.error(message);
+                    doFailover(entry.getKey(), new GeaflowHeartbeatException(message));
                 }
             } catch (Throwable e) {
-                LOGGER.error("Try to do failover due to exception from {}: {}", name,
-                    e.getMessage());
-                clusterManager.doFailover(entry.getKey(), e);
+                String message = String.format("%s %s", name, e.getMessage());
+                LOGGER.error(message, e);
+                doFailover(entry.getKey(), new GeaflowHeartbeatException(message, e));
             }
         }
     }
 
+    void doFailover(int componentId, Throwable e) {
+        StatsCollectorFactory.getInstance().getExceptionCollector().reportException(e);
+        clusterManager.doFailover(componentId, e);
+    }
+
     protected boolean isRegistered(int componentId) {
         AbstractClusterManager cm = clusterManager;
         return cm.getContainerInfos().containsKey(componentId) || cm.getDriverInfos()
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java
index a68a1df..f89f54d 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java
@@ -66,22 +66,28 @@
                 int code = childProcess.waitFor();
                 LOGGER.warn("Child process {} exits with code: {} and alive: {}", pid, code,
                     childProcess.isAlive());
-                if (code == 0) {
-                    break;
+                // 0: success, 137: killed by SIGKILL, 143: killed by SIGTERM
+                if (code == 0 || code == 137 || code == 143) {
+                    return;
                 }
                 if (restarts == 0) {
-                    String errMsg = String.format("Latest process %s exits with code: %s: "
-                            + "Exhausted after retrying startup %s times. ", pid, code, maxRestarts + 1);
+                    String errMsg;
+                    if (maxRestarts == 0) {
+                        errMsg = String.format("process exits code: %s", code);
+                    } else {
+                        errMsg = String.format("process exits code: %s, exhausted %s restarts",
+                            code, maxRestarts);
+                    }
                     throw new GeaflowRuntimeException(errMsg);
                 }
                 restarts--;
             } while (true);
-        } catch (Exception e) {
+        } catch (GeaflowRuntimeException e) {
             LOGGER.error("FATAL: start command failed: {}", command, e);
-            if (e instanceof GeaflowRuntimeException) {
-                throw (GeaflowRuntimeException) e;
-            }
-            throw new GeaflowRuntimeException(e);
+            throw e;
+        } catch (Throwable e) {
+            LOGGER.error("FATAL: start command failed: {}", command, e);
+            throw new GeaflowRuntimeException(e.getMessage(), e);
         }
     }
 
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java
index 373c68d..16ce654 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java
@@ -33,7 +33,9 @@
 import com.baidu.brpc.server.RpcServerOptions;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,23 +106,29 @@
         return false;
     }
 
+    public void stopWorker() {
+        stopWorker(mainRunner.getProcessId());
+    }
+
     public void stopWorker(int pid) {
         Preconditions.checkArgument(pid > 0, "pid should be larger than 0");
-        LOGGER.info("Kill process: {}", pid);
-        ProcessUtil.killProcess(pid);
+        LOGGER.info("Stop old process if exists");
+
+        List<Integer> pids = new ArrayList<>();
+        pids.add(pid);
 
         Process process = mainRunner.getProcess();
         if (process.isAlive()) {
-            int ppid = ProcessUtil.getProcessPid(process);
-            if (ppid <= 0) {
-                LOGGER.warn("NOT found live process {}", process);
+            int curPid = mainRunner.getProcessId();
+            if (curPid <= 0) {
+                LOGGER.warn("Process is alive but pid not found: {}", process);
                 return;
             }
-            if (pid != ppid) {
-                LOGGER.info("Kill parent process: {}", ppid);
-                process.destroy();
+            if (pid != curPid) {
+                pids.add(curPid);
             }
         }
+        ProcessUtil.killProcesses(pids);
     }
 
     public void startAgent() {
@@ -139,4 +147,10 @@
         }
     }
 
+    public void stop() {
+        if (rpcService != null) {
+            rpcService.stopService();
+        }
+    }
+
 }
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java
index 5993cc8..71915d0 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java
@@ -42,7 +42,7 @@
     private final String clusterId;
     private final Configuration configuration;
     private final IClusterMetaKVStore<String, Object> componentBackend;
-    private final Map<String, IClusterMetaKVStore<String, Object>> backends;
+    private Map<String, IClusterMetaKVStore<String, Object>> backends;
 
     private ClusterMetaStore(int id, String name, Configuration configuration) {
         this.componentId = id;
@@ -79,7 +79,9 @@
     public static synchronized void close() {
         LOGGER.info("close ClusterMetaStore");
         if (INSTANCE != null) {
-            for (IClusterMetaKVStore<String, Object> backend : INSTANCE.backends.values()) {
+            Map<String, IClusterMetaKVStore<String, Object>> backends = INSTANCE.backends;
+            INSTANCE.backends = null;
+            for (IClusterMetaKVStore<String, Object> backend : backends.values()) {
                 backend.close();
             }
             INSTANCE = null;
@@ -220,6 +222,10 @@
             default:
                 return componentBackend;
         }
+        // Cluster meta store is closed.
+        if (backends == null) {
+            return null;
+        }
         if (!backends.containsKey(namespace)) {
             synchronized (ClusterMetaStore.class) {
                 if (!backends.containsKey(namespace)) {
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManagerTest.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManagerTest.java
new file mode 100644
index 0000000..a3e5b48
--- /dev/null
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManagerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2023 AntGroup CO., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package com.antgroup.geaflow.cluster.heartbeat;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+
+import com.antgroup.geaflow.cluster.clustermanager.AbstractClusterManager;
+import com.antgroup.geaflow.cluster.container.ContainerInfo;
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
+import com.antgroup.geaflow.common.exception.GeaflowHeartbeatException;
+import com.antgroup.geaflow.common.heartbeat.Heartbeat;
+import com.antgroup.geaflow.common.utils.SleepUtils;
+import com.antgroup.geaflow.rpc.proto.Master.HeartbeatResponse;
+import com.antgroup.geaflow.stats.sink.IStatsWriter;
+import java.util.HashMap;
+import java.util.Map;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class HeartbeatManagerTest {
+
+    @Mock
+    private AbstractClusterManager clusterManager;
+
+    @Mock
+    private IStatsWriter statsWriter;
+
+    private HeartbeatManager heartbeatManager;
+
+    @BeforeMethod
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        Configuration config = new Configuration();
+        config.put(ExecutionConfigKeys.HEARTBEAT_INITIAL_DELAY_MS, "60000");
+        config.put(ExecutionConfigKeys.HEARTBEAT_INTERVAL_MS, "60000");
+        config.put(ExecutionConfigKeys.HEARTBEAT_TIMEOUT_MS, "500");
+        heartbeatManager = new HeartbeatManager(config, clusterManager);
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        heartbeatManager.close();
+    }
+
+    @Test
+    public void receivedHeartbeat_RegisteredHeartbeat_ReturnsSuccessAndRegistered() {
+        Heartbeat heartbeat = new Heartbeat(1);
+        when(clusterManager.getContainerInfos()).thenReturn(new HashMap<Integer, ContainerInfo>() {{
+            put(1, new ContainerInfo());
+        }});
+
+        HeartbeatResponse response = heartbeatManager.receivedHeartbeat(heartbeat);
+
+        assertEquals(true, response.getSuccess());
+        assertEquals(true, response.getRegistered());
+    }
+
+    @Test
+    public void receivedHeartbeat_UnregisteredHeartbeat_ReturnsSuccessAndNotRegistered() {
+        Heartbeat heartbeat = new Heartbeat(2);
+        when(clusterManager.getContainerInfos()).thenReturn(new HashMap<Integer, ContainerInfo>());
+
+        HeartbeatResponse response = heartbeatManager.receivedHeartbeat(heartbeat);
+
+        assertEquals(true, response.getSuccess());
+        assertEquals(false, response.getRegistered());
+    }
+
+    @Test
+    public void checkHeartBeat_LogsWarningsAndErrors() {
+        Map<Integer, String> containerMap = new HashMap<>();
+        containerMap.put(1, "container1");
+        when(clusterManager.getContainerIds()).thenReturn(containerMap);
+        when(clusterManager.getDriverIds()).thenReturn(new HashMap<Integer, String>());
+
+        Heartbeat heartbeat = new Heartbeat(1);
+        heartbeatManager.receivedHeartbeat(heartbeat);
+        SleepUtils.sleepMilliSecond(600);
+        heartbeatManager.checkHeartBeat();
+
+        verify(clusterManager, times(1)).doFailover(eq(1), isA(GeaflowHeartbeatException.class));
+    }
+
+    @Test
+    public void doFailover_ReportsExceptionAndCallsFailover() {
+        int componentId = 1;
+        Throwable exception = new RuntimeException("Test exception");
+
+        heartbeatManager.doFailover(componentId, exception);
+
+        verify(clusterManager, times(1)).doFailover(eq(componentId), eq(exception));
+    }
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/runner/SupervisorTest.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/runner/SupervisorTest.java
new file mode 100644
index 0000000..90fe276
--- /dev/null
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/runner/SupervisorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2023 AntGroup CO., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package com.antgroup.geaflow.cluster.runner;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.utils.SleepUtils;
+import java.util.HashMap;
+import java.util.Map;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class SupervisorTest {
+    @Test
+    public void test() {
+        Configuration configuration = new Configuration();
+        Map<String, String> envMap = new HashMap<>();
+        String cmd = "sleep 30000";
+        Supervisor supervisor = new Supervisor(cmd, configuration,false);
+        supervisor.startWorker();
+        // wait for process starts.
+        while (!supervisor.isWorkerAlive()) {
+            SleepUtils.sleepMilliSecond(500);
+        }
+        supervisor.stopWorker();
+        // wait for process exits.
+        SleepUtils.sleepSecond(1);
+        Assert.assertFalse(supervisor.isWorkerAlive());
+        supervisor.stop();
+    }
+}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java b/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java
index cf9da1b..a92b4bc 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java
@@ -258,7 +258,7 @@
                     }
                 }
             } catch (IOException e) {
-                throw new GeaflowRuntimeException(e);
+                throw new GeaflowRuntimeException(e.getMessage(), e);
             }
 
             return false;
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java
index 6c7f538..dfd39f3 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java
@@ -14,6 +14,7 @@
 
 package com.antgroup.geaflow.operator.impl.graph.compute.dynamic;
 
+import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.ENABLE_DETAIL_METRIC;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -39,6 +40,7 @@
 import com.antgroup.geaflow.view.IViewDesc;
 import com.antgroup.geaflow.view.graph.GraphViewDesc;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.testng.Assert;
@@ -55,7 +57,8 @@
                 Integer.class, ValueEdge.class, IntegerType.class))
             .build();
 
-        DynamicGraphVertexCentricComputeOp operator = new DynamicGraphVertexCentricComputeOp(graphViewDesc, new IncVertexCentricCompute(5) {
+        DynamicGraphVertexCentricComputeOp operator = new DynamicGraphVertexCentricComputeOp(
+            graphViewDesc, new IncVertexCentricCompute(5) {
             @Override
             public IncVertexCentricComputeFunction getIncComputeFunction() {
                 return mock(IncVertexCentricComputeFunction.class);
@@ -66,7 +69,11 @@
                 return null;
             }
         });
-        ((AbstractOperator)operator).getOpArgs().setOpName("test");
+        ((AbstractOperator) operator).getOpArgs().setOpName("test");
+        ((AbstractOperator) operator).getOpArgs()
+            .setConfig(new HashMap<String, String>() {{
+                put(ENABLE_DETAIL_METRIC.getKey(), "false");
+            }});
 
         List<ICollector> collectors = new ArrayList<>();
         ICollector collector = mock(ICollector.class);
@@ -76,7 +83,8 @@
         collectors.add(collector);
         collectors.add(collector);
         long startWindowId = 0;
-        Operator.OpContext context = new AbstractOperator.DefaultOpContext(collectors, new TestRuntimeContext());
+        Operator.OpContext context = new AbstractOperator.DefaultOpContext(collectors,
+            new TestRuntimeContext());
         operator.open(context);
 
         Assert.assertEquals(startWindowId, ReflectionUtil.getField(operator, "windowId"));
@@ -98,6 +106,10 @@
             super(new Configuration());
         }
 
+        public TestRuntimeContext(Map<String, String> opConfig) {
+            super(new Configuration(opConfig));
+        }
+
         @Override
         public long getPipelineId() {
             return 0;
@@ -115,12 +127,12 @@
 
         @Override
         public Configuration getConfiguration() {
-            return new Configuration();
+            return jobConfig;
         }
 
         @Override
         public RuntimeContext clone(Map<String, String> opConfig) {
-            return new TestRuntimeContext();
+            return new TestRuntimeContext(opConfig);
         }
 
         @Override
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java
index 549538d..4b32e73 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java
@@ -17,7 +17,6 @@
 import static com.antgroup.geaflow.cluster.constants.ClusterConstants.ENV_AGENT_PORT;
 import static com.antgroup.geaflow.cluster.constants.ClusterConstants.ENV_SUPERVISOR_PORT;
 import static com.antgroup.geaflow.cluster.constants.ClusterConstants.EXIT_CODE;
-import static com.antgroup.geaflow.cluster.constants.ClusterConstants.IS_RECOVER;
 import static com.antgroup.geaflow.cluster.constants.ClusterConstants.MASTER_ID;
 import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.AGENT_HTTP_PORT;
 import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SUPERVISOR_RPC_PORT;
@@ -30,6 +29,7 @@
 import com.antgroup.geaflow.cluster.runner.util.ClusterUtils;
 import com.antgroup.geaflow.cluster.runner.util.RunnerRuntimeHook;
 import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.utils.ProcessUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,14 +59,10 @@
 
             String id = ClusterUtils.getProperty(ClusterConstants.CONTAINER_ID);
             String masterId = ClusterUtils.getProperty(MASTER_ID);
-            boolean isRecover = Boolean.parseBoolean(
-                ClusterUtils.getProperty(IS_RECOVER));
-            LOGGER.info("ResourceID assigned for this container:{} masterId:{}, isRecover:{}", id,
-                masterId, isRecover);
+            LOGGER.info("ResourceID assigned for this container:{} masterId:{}", id, masterId);
 
             Configuration config = ClusterUtils.loadConfiguration();
             config.setMasterId(masterId);
-
             String supervisorPort = ClusterUtils.getEnvValue(System.getenv(), ENV_SUPERVISOR_PORT);
             config.put(SUPERVISOR_RPC_PORT, supervisorPort);
             String agentPort = ClusterUtils.getEnvValue(System.getenv(), ENV_AGENT_PORT);
@@ -76,14 +72,13 @@
             new RunnerRuntimeHook(ContainerRunner.class.getSimpleName(),
                 Integer.parseInt(supervisorPort)).start();
 
-            ContainerContext context = new ContainerContext(Integer.parseInt(id), config,
-                isRecover);
+            ContainerContext context = new ContainerContext(Integer.parseInt(id), config);
             ContainerRunner containerRunner = new ContainerRunner(context);
             containerRunner.run();
             LOGGER.info("Completed container init in {}ms", System.currentTimeMillis() - startTime);
             containerRunner.waitForTermination();
         } catch (Throwable e) {
-            LOGGER.error("FATAL: process exits", e);
+            LOGGER.error("FATAL: process {} exits", ProcessUtil.getProcessId(), e);
             System.exit(EXIT_CODE);
         }
     }
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java
index acd7a07..8b90a54 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java
@@ -29,25 +29,22 @@
 import com.antgroup.geaflow.cluster.runner.util.ClusterUtils;
 import com.antgroup.geaflow.cluster.runner.util.RunnerRuntimeHook;
 import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.utils.ProcessUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DriverRunner {
     private static final Logger LOGGER = LoggerFactory.getLogger(DriverRunner.class);
-    private final int driverId;
-    private final int driverIndex;
-    private final Configuration config;
+
+    private final DriverContext driverContext;
     private Driver driver;
 
-    public DriverRunner(int driverId, int driverIndex, Configuration config) {
-        this.driverId = driverId;
-        this.driverIndex = driverIndex;
-        this.config = config;
+    public DriverRunner(DriverContext driverContext) {
+        this.driverContext = driverContext;
     }
 
     public void run() {
-        DriverContext driverContext = new DriverContext(driverId, driverIndex, config);
-        int rpcPort = config.getInteger(DRIVER_RPC_PORT);
+        int rpcPort = driverContext.getConfig().getInteger(DRIVER_RPC_PORT);
         driver = new Driver(rpcPort);
         driverContext.load();
         driver.init(driverContext);
@@ -69,7 +66,6 @@
 
             Configuration config = ClusterUtils.loadConfiguration();
             config.setMasterId(masterId);
-
             String supervisorPort = ClusterUtils.getEnvValue(System.getenv(), ENV_SUPERVISOR_PORT);
             config.put(SUPERVISOR_RPC_PORT, supervisorPort);
             String agentPort = ClusterUtils.getEnvValue(System.getenv(), ENV_AGENT_PORT);
@@ -79,13 +75,14 @@
             new RunnerRuntimeHook(DriverRunner.class.getSimpleName(),
                 Integer.parseInt(supervisorPort)).start();
 
-            DriverRunner driverRunner = new DriverRunner(Integer.parseInt(id),
+            DriverContext context = new DriverContext(Integer.parseInt(id),
                 Integer.parseInt(index), config);
+            DriverRunner driverRunner = new DriverRunner(context);
             driverRunner.run();
             LOGGER.info("Completed driver init in {} ms", System.currentTimeMillis() - startTime);
             driverRunner.waitForTermination();
         } catch (Throwable e) {
-            LOGGER.error("FATAL: process exits", e);
+            LOGGER.error("FATAL: process {} exits", ProcessUtil.getProcessId(), e);
             System.exit(EXIT_CODE);
         }
     }
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java
index 37a0723..8289e1d 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java
@@ -27,6 +27,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * This strategy is to restart the whole cluster by the master once an anomaly is detected.
+ */
 public class ClusterFailoverStrategy extends AbstractFailoverStrategy {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClusterFailoverStrategy.class);
 
@@ -39,28 +42,32 @@
     @Override
     public void init(ClusterContext context) {
         super.init(context);
+        // Set true if in recovering and reset to false after recovering finished.
         this.doKilling = new AtomicBoolean(context.isRecover());
+        // Disable worker process auto-restart because master will do that.
         context.getConfig().put(PROCESS_AUTO_RESTART, Boolean.FALSE.toString());
+        LOGGER.info("init with recovering: {}", context.isRecover());
     }
 
     @Override
     public void doFailover(int componentId, Throwable cause) {
-        boolean isMasterRestarts = componentId == DEFAULT_MASTER_ID;
+        boolean isMasterRestarts = (componentId == DEFAULT_MASTER_ID);
         if (isMasterRestarts) {
+            // Master restart itself when the process is started in recover mode.
             final long startTime = System.currentTimeMillis();
             clusterManager.restartAllDrivers();
             clusterManager.restartAllContainers();
             doKilling.set(false);
-            String finishMessage = String.format("Completed cluster failover in %s ms.",
+            String finishMessage = String.format("Completed failover in %s ms.",
                 System.currentTimeMillis() - startTime);
             LOGGER.info(finishMessage);
             reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_FINISH, finishMessage);
         } else if (doKilling.compareAndSet(false, true)) {
             String reason = cause == null ? null : cause.getMessage();
-            String startMessage = String.format("Start master cluster failover triggered by "
-                    + "component #%s: %s.", componentId, reason);
+            String startMessage = String.format("Start failover due to %s", reason);
             LOGGER.info(startMessage);
-            reportFailoverEvent(ExceptionLevel.ERROR, EventLabel.FAILOVER_START, startMessage);
+            reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_START, startMessage);
+            // Trigger process restart.
             System.exit(EXIT_CODE);
         }
     }
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java
index 9276e07..1c09365 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java
@@ -19,8 +19,14 @@
 import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
 import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
 import com.antgroup.geaflow.env.IEnvironment.EnvType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * This strategy is to restart the process by supervisor but not master.
+ */
 public class ComponentFailoverStrategy extends AbstractFailoverStrategy {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ComponentFailoverStrategy.class);
 
     public ComponentFailoverStrategy(EnvType envType) {
         super(envType);
@@ -30,6 +36,7 @@
     public void init(ClusterContext context) {
         super.init(context);
         context.getConfig().put(PROCESS_AUTO_RESTART, AutoRestartPolicy.UNEXPECTED.getValue());
+        LOGGER.info("init with foRestarts: {}", context.getClusterConfig().getMaxRestarts());
     }
 
     @Override
diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java
index 5fda040..f9f4a4b 100644
--- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java
+++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java
@@ -30,8 +30,10 @@
 import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
 import com.antgroup.geaflow.common.utils.SleepUtils;
 import com.antgroup.geaflow.env.IEnvironment.EnvType;
+import com.antgroup.geaflow.env.args.EnvironmentArgumentParser;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.Map;
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,13 +52,22 @@
         ClusterStartedCallback callback = ClusterCallbackFactory.createClusterStartCallback(config);
         try {
             System.setProperty(CLUSTER_TYPE, EnvType.K8S.name());
+
             userClass = config.getString(USER_MAIN_CLASS);
+            LOGGER.info("execute mainClass {} to k8s, args: {}", userClass, classArgs);
+
+            EnvironmentArgumentParser parser = new EnvironmentArgumentParser();
+            Map<String, String> newConfig = parser.parse(new String[]{classArgs});
+            config.putAll(newConfig);
+
+            callback = ClusterCallbackFactory.createClusterStartCallback(config);
+            LOGGER.info("client callback: {}", callback.getClass().getCanonicalName());
+
             Class<?> mainClazz = Thread.currentThread().getContextClassLoader().loadClass(userClass);
             Method mainMethod = mainClazz.getMethod("main", String[].class);
-            LOGGER.info("execute mainClass {} to k8s, params: {}", userClass, classArgs);
             mainMethod.invoke(mainClazz, (Object) new String[] {classArgs});
         } catch (Throwable e) {
-            LOGGER.error("execute mainClass {} failed: {}", userClass, e.getMessage());
+            LOGGER.error("execute mainClass {} failed", userClass, e);
             callback.onFailure(e);
             throw new GeaflowRuntimeException(e);
         } finally {