fix cluster fo (#450)
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java
index 5a9b352..2bb6ed3 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/exception/GeaflowHeartbeatException.java
@@ -22,7 +22,11 @@
super(MESSAGE);
}
- public GeaflowHeartbeatException(Throwable cause) {
+ public GeaflowHeartbeatException(String message) {
+ super(message);
+ }
+
+ public GeaflowHeartbeatException(String message, Throwable cause) {
super(MESSAGE, cause);
}
}
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java
index 22cfe06..3ebe8a3 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/utils/ProcessUtil.java
@@ -20,7 +20,9 @@
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Field;
import java.net.InetAddress;
+import java.util.List;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,14 +114,23 @@
return pid;
}
- public static synchronized void killProcess(int pid) {
- LOGGER.info("Kill -9 {}", pid);
+ public static void killProcess(int pid) {
+ execute("kill -9 " + pid);
+ }
+
+ public static void killProcesses(List<Integer> pidList) {
+ String pids = StringUtils.join(pidList, " ");
+ execute("kill -9 " + pids);
+ }
+
+ public static void execute(String cmd) {
+ LOGGER.info(cmd);
try {
- Process process = Runtime.getRuntime().exec("kill -9 " + pid);
+ Process process = Runtime.getRuntime().exec(cmd);
process.waitFor();
} catch (InterruptedException | IOException e) {
- LOGGER.error("Kill {} failed: {}", pid, e.getMessage());
- throw new GeaflowRuntimeException(e);
+ LOGGER.error(" {} failed: {}", cmd, e);
+ throw new GeaflowRuntimeException(e.getMessage(), e);
}
}
diff --git a/geaflow/geaflow-common/src/test/java/com/antgroup/geaflow/common/utils/ProcessUtilTest.java b/geaflow/geaflow-common/src/test/java/com/antgroup/geaflow/common/utils/ProcessUtilTest.java
new file mode 100644
index 0000000..fda14f5
--- /dev/null
+++ b/geaflow/geaflow-common/src/test/java/com/antgroup/geaflow/common/utils/ProcessUtilTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2023 AntGroup CO., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package com.antgroup.geaflow.common.utils;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertThrows;
+
+import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
+import java.io.IOException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class ProcessUtilTest {
+
+ @Mock
+ private Runtime runtime;
+
+ @Mock
+ private Process process;
+
+ @BeforeMethod
+ public void setUp() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ when(runtime.exec(anyString())).thenReturn(process);
+ }
+
+ @Test
+ public void execute_CommandThrowsIOException_ExceptionHandled() throws IOException, InterruptedException {
+ String cmd = "some command";
+ when(runtime.exec(anyString())).thenThrow(new IOException("IO error"));
+
+ assertThrows(GeaflowRuntimeException.class, () -> ProcessUtil.execute(cmd));
+ }
+
+ @Test
+ public void execute_CommandThrowsInterruptedException_ExceptionHandled() throws IOException, InterruptedException {
+ String cmd = "some command";
+ when(runtime.exec(anyString())).thenReturn(process);
+ doThrow(new InterruptedException("Interrupted")).when(process).waitFor();
+
+ assertThrows(GeaflowRuntimeException.class, () -> ProcessUtil.execute(cmd));
+ }
+}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java
index 0f97888..816684a 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/ClusterContext.java
@@ -102,19 +102,21 @@
ClusterMetaStore metaStore = ClusterMetaStore.getInstance(id, name, config);
Map<Integer, String> drivers = metaStore.getDriverIds();
Map<Integer, String> containerIds = metaStore.getContainerIds();
- if (drivers != null && !drivers.isEmpty() && containerIds != null && !containerIds
- .isEmpty()) {
+ int driverNum = drivers == null ? 0 : drivers.size();
+ int containerNum = containerIds == null ? 0 : containerIds.size();
+ if (driverNum != 0 && containerNum != 0) {
this.isRecover = true;
this.driverIds = drivers;
this.containerIds = containerIds;
this.maxComponentId = metaStore.getMaxContainerId();
- LOGGER.info("recover {} containers and {} drivers maxComponentId {} from metaStore",
- containerIds.size(), drivers.size(), maxComponentId);
+ LOGGER.info("recover {} containers and {} drivers maxComponentId: {}",
+ containerNum, driverNum, maxComponentId);
} else {
this.isRecover = false;
this.driverIds = new ConcurrentHashMap<>();
this.containerIds = new ConcurrentHashMap<>();
this.maxComponentId = 0;
+ LOGGER.info("init with maxComponentId: {}", maxComponentId);
}
}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java
index e5e5d64..3bae59e 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/PipelineInputFetcher.java
@@ -158,9 +158,12 @@
Thread.currentThread().setName(this.name);
try {
this.fetch();
+ } catch (GeaflowRuntimeException e) {
+ LOGGER.error("fetcher task err with window id {} {}", this.targetWindowId, this.name, e);
+ throw e;
} catch (Throwable e) {
LOGGER.error("fetcher task err with window id {} {}", this.targetWindowId, this.name, e);
- throw new GeaflowRuntimeException(e);
+ throw new GeaflowRuntimeException(e.getMessage(), e);
}
}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java
index 30eb46d..ce77c4c 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManager.java
@@ -63,9 +63,8 @@
private final ScheduledFuture<?> reportFuture;
private final ScheduledExecutorService checkTimeoutService;
private final ScheduledExecutorService heartbeatReportService;
- private final GeaflowHeartbeatException timeoutException;
private ScheduledFuture<?> checkFuture;
- private IStatsWriter statsWriter;
+ private final IStatsWriter statsWriter;
public HeartbeatManager(Configuration config, IClusterManager clusterManager) {
this.senderMap = new ConcurrentHashMap<>();
@@ -86,6 +85,7 @@
this.checkFuture = checkTimeoutService.scheduleAtFixedRate(this::checkSupervisorHealth,
heartbeatCheckMs, heartbeatCheckMs, TimeUnit.MILLISECONDS);
}
+
this.heartbeatReportService = new ScheduledThreadPoolExecutor(1,
ThreadUtil.namedThreadFactory(true, "heartbeat-report"));
this.reportFuture = heartbeatReportService
@@ -93,7 +93,6 @@
TimeUnit.MILLISECONDS);
this.clusterManager = (AbstractClusterManager) clusterManager;
- this.timeoutException = new GeaflowHeartbeatException();
this.statsWriter = StatsCollectorFactory.init(config).getStatsWriter();
}
@@ -107,7 +106,7 @@
this.statsWriter.addMetric(masterInfo.getName(), masterInfo);
}
- private void checkHeartBeat() {
+ void checkHeartBeat() {
try {
long checkTime = System.currentTimeMillis();
checkTimeout(clusterManager.getContainerIds(), checkTime);
@@ -128,8 +127,9 @@
LOGGER.warn("{} is not registered", entry.getValue());
}
} else if (checkTime > heartbeat.getTimestamp() + heartbeatTimeoutMs) {
- LOGGER.error("{} heartbeat is missing", entry.getValue());
- clusterManager.doFailover(componentId, timeoutException);
+ String message = String.format("%s heartbeat is lost", entry.getValue());
+ LOGGER.error(message);
+ doFailover(componentId, new GeaflowHeartbeatException(message));
}
}
}
@@ -147,7 +147,7 @@
checkSupervisorHealth(clusterManager.getContainerIds());
checkSupervisorHealth(clusterManager.getDriverIds());
} catch (Throwable e) {
- LOGGER.warn("Check container healthy error", e);
+ LOGGER.warn("Check container healthy error: {}", e.getMessage(), e);
}
}
@@ -157,17 +157,23 @@
try {
StatusResponse response = RpcClient.getInstance().querySupervisorStatus(name);
if (!response.getIsAlive()) {
- LOGGER.info("Found {} is not alive and do failover", name);
- clusterManager.doFailover(entry.getKey(), timeoutException);
+ String message = String.format("supervisor of %s is not alive", name);
+ LOGGER.error(message);
+ doFailover(entry.getKey(), new GeaflowHeartbeatException(message));
}
} catch (Throwable e) {
- LOGGER.error("Try to do failover due to exception from {}: {}", name,
- e.getMessage());
- clusterManager.doFailover(entry.getKey(), e);
+ String message = String.format("%s %s", name, e.getMessage());
+ LOGGER.error(message, e);
+ doFailover(entry.getKey(), new GeaflowHeartbeatException(message, e));
}
}
}
+ void doFailover(int componentId, Throwable e) {
+ StatsCollectorFactory.getInstance().getExceptionCollector().reportException(e);
+ clusterManager.doFailover(componentId, e);
+ }
+
protected boolean isRegistered(int componentId) {
AbstractClusterManager cm = clusterManager;
return cm.getContainerInfos().containsKey(componentId) || cm.getDriverInfos()
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java
index a68a1df..f89f54d 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java
@@ -66,22 +66,28 @@
int code = childProcess.waitFor();
LOGGER.warn("Child process {} exits with code: {} and alive: {}", pid, code,
childProcess.isAlive());
- if (code == 0) {
- break;
+ // 0: success, 137: killed by SIGKILL, 143: killed by SIGTERM
+ if (code == 0 || code == 137 || code == 143) {
+ return;
}
if (restarts == 0) {
- String errMsg = String.format("Latest process %s exits with code: %s: "
- + "Exhausted after retrying startup %s times. ", pid, code, maxRestarts + 1);
+ String errMsg;
+ if (maxRestarts == 0) {
+ errMsg = String.format("process exits code: %s", code);
+ } else {
+ errMsg = String.format("process exits code: %s, exhausted %s restarts",
+ code, maxRestarts);
+ }
throw new GeaflowRuntimeException(errMsg);
}
restarts--;
} while (true);
- } catch (Exception e) {
+ } catch (GeaflowRuntimeException e) {
LOGGER.error("FATAL: start command failed: {}", command, e);
- if (e instanceof GeaflowRuntimeException) {
- throw (GeaflowRuntimeException) e;
- }
- throw new GeaflowRuntimeException(e);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("FATAL: start command failed: {}", command, e);
+ throw new GeaflowRuntimeException(e.getMessage(), e);
}
}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java
index 373c68d..16ce654 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/Supervisor.java
@@ -33,7 +33,9 @@
import com.baidu.brpc.server.RpcServerOptions;
import com.google.common.base.Preconditions;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,23 +106,29 @@
return false;
}
+ public void stopWorker() {
+ stopWorker(mainRunner.getProcessId());
+ }
+
public void stopWorker(int pid) {
Preconditions.checkArgument(pid > 0, "pid should be larger than 0");
- LOGGER.info("Kill process: {}", pid);
- ProcessUtil.killProcess(pid);
+ LOGGER.info("Stop old process if exists");
+
+ List<Integer> pids = new ArrayList<>();
+ pids.add(pid);
Process process = mainRunner.getProcess();
if (process.isAlive()) {
- int ppid = ProcessUtil.getProcessPid(process);
- if (ppid <= 0) {
- LOGGER.warn("NOT found live process {}", process);
+ int curPid = mainRunner.getProcessId();
+ if (curPid <= 0) {
+ LOGGER.warn("Process is alive but pid not found: {}", process);
return;
}
- if (pid != ppid) {
- LOGGER.info("Kill parent process: {}", ppid);
- process.destroy();
+ if (pid != curPid) {
+ pids.add(curPid);
}
}
+ ProcessUtil.killProcesses(pids);
}
public void startAgent() {
@@ -139,4 +147,10 @@
}
}
+ public void stop() {
+ if (rpcService != null) {
+ rpcService.stopService();
+ }
+ }
+
}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java
index 5993cc8..71915d0 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/system/ClusterMetaStore.java
@@ -42,7 +42,7 @@
private final String clusterId;
private final Configuration configuration;
private final IClusterMetaKVStore<String, Object> componentBackend;
- private final Map<String, IClusterMetaKVStore<String, Object>> backends;
+ private Map<String, IClusterMetaKVStore<String, Object>> backends;
private ClusterMetaStore(int id, String name, Configuration configuration) {
this.componentId = id;
@@ -79,7 +79,9 @@
public static synchronized void close() {
LOGGER.info("close ClusterMetaStore");
if (INSTANCE != null) {
- for (IClusterMetaKVStore<String, Object> backend : INSTANCE.backends.values()) {
+ Map<String, IClusterMetaKVStore<String, Object>> backends = INSTANCE.backends;
+ INSTANCE.backends = null;
+ for (IClusterMetaKVStore<String, Object> backend : backends.values()) {
backend.close();
}
INSTANCE = null;
@@ -220,6 +222,10 @@
default:
return componentBackend;
}
+ // Cluster meta store is closed.
+ if (backends == null) {
+ return null;
+ }
if (!backends.containsKey(namespace)) {
synchronized (ClusterMetaStore.class) {
if (!backends.containsKey(namespace)) {
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManagerTest.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManagerTest.java
new file mode 100644
index 0000000..a3e5b48
--- /dev/null
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/heartbeat/HeartbeatManagerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2023 AntGroup CO., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package com.antgroup.geaflow.cluster.heartbeat;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertEquals;
+
+import com.antgroup.geaflow.cluster.clustermanager.AbstractClusterManager;
+import com.antgroup.geaflow.cluster.container.ContainerInfo;
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
+import com.antgroup.geaflow.common.exception.GeaflowHeartbeatException;
+import com.antgroup.geaflow.common.heartbeat.Heartbeat;
+import com.antgroup.geaflow.common.utils.SleepUtils;
+import com.antgroup.geaflow.rpc.proto.Master.HeartbeatResponse;
+import com.antgroup.geaflow.stats.sink.IStatsWriter;
+import java.util.HashMap;
+import java.util.Map;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class HeartbeatManagerTest {
+
+ @Mock
+ private AbstractClusterManager clusterManager;
+
+ @Mock
+ private IStatsWriter statsWriter;
+
+ private HeartbeatManager heartbeatManager;
+
+ @BeforeMethod
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ Configuration config = new Configuration();
+ config.put(ExecutionConfigKeys.HEARTBEAT_INITIAL_DELAY_MS, "60000");
+ config.put(ExecutionConfigKeys.HEARTBEAT_INTERVAL_MS, "60000");
+ config.put(ExecutionConfigKeys.HEARTBEAT_TIMEOUT_MS, "500");
+ heartbeatManager = new HeartbeatManager(config, clusterManager);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ heartbeatManager.close();
+ }
+
+ @Test
+ public void receivedHeartbeat_RegisteredHeartbeat_ReturnsSuccessAndRegistered() {
+ Heartbeat heartbeat = new Heartbeat(1);
+ when(clusterManager.getContainerInfos()).thenReturn(new HashMap<Integer, ContainerInfo>() {{
+ put(1, new ContainerInfo());
+ }});
+
+ HeartbeatResponse response = heartbeatManager.receivedHeartbeat(heartbeat);
+
+ assertEquals(true, response.getSuccess());
+ assertEquals(true, response.getRegistered());
+ }
+
+ @Test
+ public void receivedHeartbeat_UnregisteredHeartbeat_ReturnsSuccessAndNotRegistered() {
+ Heartbeat heartbeat = new Heartbeat(2);
+ when(clusterManager.getContainerInfos()).thenReturn(new HashMap<Integer, ContainerInfo>());
+
+ HeartbeatResponse response = heartbeatManager.receivedHeartbeat(heartbeat);
+
+ assertEquals(true, response.getSuccess());
+ assertEquals(false, response.getRegistered());
+ }
+
+ @Test
+ public void checkHeartBeat_LogsWarningsAndErrors() {
+ Map<Integer, String> containerMap = new HashMap<>();
+ containerMap.put(1, "container1");
+ when(clusterManager.getContainerIds()).thenReturn(containerMap);
+ when(clusterManager.getDriverIds()).thenReturn(new HashMap<Integer, String>());
+
+ Heartbeat heartbeat = new Heartbeat(1);
+ heartbeatManager.receivedHeartbeat(heartbeat);
+ SleepUtils.sleepMilliSecond(600);
+ heartbeatManager.checkHeartBeat();
+
+ verify(clusterManager, times(1)).doFailover(eq(1), isA(GeaflowHeartbeatException.class));
+ }
+
+ @Test
+ public void doFailover_ReportsExceptionAndCallsFailover() {
+ int componentId = 1;
+ Throwable exception = new RuntimeException("Test exception");
+
+ heartbeatManager.doFailover(componentId, exception);
+
+ verify(clusterManager, times(1)).doFailover(eq(componentId), eq(exception));
+ }
+}
\ No newline at end of file
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/runner/SupervisorTest.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/runner/SupervisorTest.java
new file mode 100644
index 0000000..90fe276
--- /dev/null
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/runner/SupervisorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2023 AntGroup CO., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package com.antgroup.geaflow.cluster.runner;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.utils.SleepUtils;
+import java.util.HashMap;
+import java.util.Map;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class SupervisorTest {
+ @Test
+ public void test() {
+ Configuration configuration = new Configuration();
+ Map<String, String> envMap = new HashMap<>();
+ String cmd = "sleep 30000";
+ Supervisor supervisor = new Supervisor(cmd, configuration,false);
+ supervisor.startWorker();
+ // wait for process starts.
+ while (!supervisor.isWorkerAlive()) {
+ SleepUtils.sleepMilliSecond(500);
+ }
+ supervisor.stopWorker();
+ // wait for process exits.
+ SleepUtils.sleepSecond(1);
+ Assert.assertFalse(supervisor.isWorkerAlive());
+ supervisor.stop();
+ }
+}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java b/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java
index cf9da1b..a92b4bc 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-shuffle/src/main/java/com/antgroup/geaflow/shuffle/pipeline/slice/SpillablePipelineSlice.java
@@ -258,7 +258,7 @@
}
}
} catch (IOException e) {
- throw new GeaflowRuntimeException(e);
+ throw new GeaflowRuntimeException(e.getMessage(), e);
}
return false;
diff --git a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java
index 6c7f538..dfd39f3 100644
--- a/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java
+++ b/geaflow/geaflow-core/geaflow-runtime/geaflow-operator/src/test/java/com/antgroup/geaflow/operator/impl/graph/compute/dynamic/DynamicGraphVertexCentricComputeOpTest.java
@@ -14,6 +14,7 @@
package com.antgroup.geaflow.operator.impl.graph.compute.dynamic;
+import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.ENABLE_DETAIL_METRIC;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -39,6 +40,7 @@
import com.antgroup.geaflow.view.IViewDesc;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.testng.Assert;
@@ -55,7 +57,8 @@
Integer.class, ValueEdge.class, IntegerType.class))
.build();
- DynamicGraphVertexCentricComputeOp operator = new DynamicGraphVertexCentricComputeOp(graphViewDesc, new IncVertexCentricCompute(5) {
+ DynamicGraphVertexCentricComputeOp operator = new DynamicGraphVertexCentricComputeOp(
+ graphViewDesc, new IncVertexCentricCompute(5) {
@Override
public IncVertexCentricComputeFunction getIncComputeFunction() {
return mock(IncVertexCentricComputeFunction.class);
@@ -66,7 +69,11 @@
return null;
}
});
- ((AbstractOperator)operator).getOpArgs().setOpName("test");
+ ((AbstractOperator) operator).getOpArgs().setOpName("test");
+ ((AbstractOperator) operator).getOpArgs()
+ .setConfig(new HashMap<String, String>() {{
+ put(ENABLE_DETAIL_METRIC.getKey(), "false");
+ }});
List<ICollector> collectors = new ArrayList<>();
ICollector collector = mock(ICollector.class);
@@ -76,7 +83,8 @@
collectors.add(collector);
collectors.add(collector);
long startWindowId = 0;
- Operator.OpContext context = new AbstractOperator.DefaultOpContext(collectors, new TestRuntimeContext());
+ Operator.OpContext context = new AbstractOperator.DefaultOpContext(collectors,
+ new TestRuntimeContext());
operator.open(context);
Assert.assertEquals(startWindowId, ReflectionUtil.getField(operator, "windowId"));
@@ -98,6 +106,10 @@
super(new Configuration());
}
+ public TestRuntimeContext(Map<String, String> opConfig) {
+ super(new Configuration(opConfig));
+ }
+
@Override
public long getPipelineId() {
return 0;
@@ -115,12 +127,12 @@
@Override
public Configuration getConfiguration() {
- return new Configuration();
+ return jobConfig;
}
@Override
public RuntimeContext clone(Map<String, String> opConfig) {
- return new TestRuntimeContext();
+ return new TestRuntimeContext(opConfig);
}
@Override
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java
index 549538d..4b32e73 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/ContainerRunner.java
@@ -17,7 +17,6 @@
import static com.antgroup.geaflow.cluster.constants.ClusterConstants.ENV_AGENT_PORT;
import static com.antgroup.geaflow.cluster.constants.ClusterConstants.ENV_SUPERVISOR_PORT;
import static com.antgroup.geaflow.cluster.constants.ClusterConstants.EXIT_CODE;
-import static com.antgroup.geaflow.cluster.constants.ClusterConstants.IS_RECOVER;
import static com.antgroup.geaflow.cluster.constants.ClusterConstants.MASTER_ID;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.AGENT_HTTP_PORT;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SUPERVISOR_RPC_PORT;
@@ -30,6 +29,7 @@
import com.antgroup.geaflow.cluster.runner.util.ClusterUtils;
import com.antgroup.geaflow.cluster.runner.util.RunnerRuntimeHook;
import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.utils.ProcessUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,14 +59,10 @@
String id = ClusterUtils.getProperty(ClusterConstants.CONTAINER_ID);
String masterId = ClusterUtils.getProperty(MASTER_ID);
- boolean isRecover = Boolean.parseBoolean(
- ClusterUtils.getProperty(IS_RECOVER));
- LOGGER.info("ResourceID assigned for this container:{} masterId:{}, isRecover:{}", id,
- masterId, isRecover);
+ LOGGER.info("ResourceID assigned for this container:{} masterId:{}", id, masterId);
Configuration config = ClusterUtils.loadConfiguration();
config.setMasterId(masterId);
-
String supervisorPort = ClusterUtils.getEnvValue(System.getenv(), ENV_SUPERVISOR_PORT);
config.put(SUPERVISOR_RPC_PORT, supervisorPort);
String agentPort = ClusterUtils.getEnvValue(System.getenv(), ENV_AGENT_PORT);
@@ -76,14 +72,13 @@
new RunnerRuntimeHook(ContainerRunner.class.getSimpleName(),
Integer.parseInt(supervisorPort)).start();
- ContainerContext context = new ContainerContext(Integer.parseInt(id), config,
- isRecover);
+ ContainerContext context = new ContainerContext(Integer.parseInt(id), config);
ContainerRunner containerRunner = new ContainerRunner(context);
containerRunner.run();
LOGGER.info("Completed container init in {}ms", System.currentTimeMillis() - startTime);
containerRunner.waitForTermination();
} catch (Throwable e) {
- LOGGER.error("FATAL: process exits", e);
+ LOGGER.error("FATAL: process {} exits", ProcessUtil.getProcessId(), e);
System.exit(EXIT_CODE);
}
}
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java
index acd7a07..8b90a54 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/entrypoint/DriverRunner.java
@@ -29,25 +29,22 @@
import com.antgroup.geaflow.cluster.runner.util.ClusterUtils;
import com.antgroup.geaflow.cluster.runner.util.RunnerRuntimeHook;
import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.utils.ProcessUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DriverRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(DriverRunner.class);
- private final int driverId;
- private final int driverIndex;
- private final Configuration config;
+
+ private final DriverContext driverContext;
private Driver driver;
- public DriverRunner(int driverId, int driverIndex, Configuration config) {
- this.driverId = driverId;
- this.driverIndex = driverIndex;
- this.config = config;
+ public DriverRunner(DriverContext driverContext) {
+ this.driverContext = driverContext;
}
public void run() {
- DriverContext driverContext = new DriverContext(driverId, driverIndex, config);
- int rpcPort = config.getInteger(DRIVER_RPC_PORT);
+ int rpcPort = driverContext.getConfig().getInteger(DRIVER_RPC_PORT);
driver = new Driver(rpcPort);
driverContext.load();
driver.init(driverContext);
@@ -69,7 +66,6 @@
Configuration config = ClusterUtils.loadConfiguration();
config.setMasterId(masterId);
-
String supervisorPort = ClusterUtils.getEnvValue(System.getenv(), ENV_SUPERVISOR_PORT);
config.put(SUPERVISOR_RPC_PORT, supervisorPort);
String agentPort = ClusterUtils.getEnvValue(System.getenv(), ENV_AGENT_PORT);
@@ -79,13 +75,14 @@
new RunnerRuntimeHook(DriverRunner.class.getSimpleName(),
Integer.parseInt(supervisorPort)).start();
- DriverRunner driverRunner = new DriverRunner(Integer.parseInt(id),
+ DriverContext context = new DriverContext(Integer.parseInt(id),
Integer.parseInt(index), config);
+ DriverRunner driverRunner = new DriverRunner(context);
driverRunner.run();
LOGGER.info("Completed driver init in {} ms", System.currentTimeMillis() - startTime);
driverRunner.waitForTermination();
} catch (Throwable e) {
- LOGGER.error("FATAL: process exits", e);
+ LOGGER.error("FATAL: process {} exits", ProcessUtil.getProcessId(), e);
System.exit(EXIT_CODE);
}
}
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java
index 37a0723..8289e1d 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java
@@ -27,6 +27,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * This strategy is to restart the whole cluster by the master once an anomaly is detected.
+ */
public class ClusterFailoverStrategy extends AbstractFailoverStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterFailoverStrategy.class);
@@ -39,28 +42,32 @@
@Override
public void init(ClusterContext context) {
super.init(context);
+ // Set true if in recovering and reset to false after recovering finished.
this.doKilling = new AtomicBoolean(context.isRecover());
+ // Disable worker process auto-restart because master will do that.
context.getConfig().put(PROCESS_AUTO_RESTART, Boolean.FALSE.toString());
+ LOGGER.info("init with recovering: {}", context.isRecover());
}
@Override
public void doFailover(int componentId, Throwable cause) {
- boolean isMasterRestarts = componentId == DEFAULT_MASTER_ID;
+ boolean isMasterRestarts = (componentId == DEFAULT_MASTER_ID);
if (isMasterRestarts) {
+ // Master restart itself when the process is started in recover mode.
final long startTime = System.currentTimeMillis();
clusterManager.restartAllDrivers();
clusterManager.restartAllContainers();
doKilling.set(false);
- String finishMessage = String.format("Completed cluster failover in %s ms.",
+ String finishMessage = String.format("Completed failover in %s ms.",
System.currentTimeMillis() - startTime);
LOGGER.info(finishMessage);
reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_FINISH, finishMessage);
} else if (doKilling.compareAndSet(false, true)) {
String reason = cause == null ? null : cause.getMessage();
- String startMessage = String.format("Start master cluster failover triggered by "
- + "component #%s: %s.", componentId, reason);
+ String startMessage = String.format("Start failover due to %s", reason);
LOGGER.info(startMessage);
- reportFailoverEvent(ExceptionLevel.ERROR, EventLabel.FAILOVER_START, startMessage);
+ reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_START, startMessage);
+ // Trigger process restart.
System.exit(EXIT_CODE);
}
}
diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java
index 9276e07..1c09365 100644
--- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java
+++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ComponentFailoverStrategy.java
@@ -19,8 +19,14 @@
import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * This strategy is to restart the process by supervisor but not master.
+ */
public class ComponentFailoverStrategy extends AbstractFailoverStrategy {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ComponentFailoverStrategy.class);
public ComponentFailoverStrategy(EnvType envType) {
super(envType);
@@ -30,6 +36,7 @@
public void init(ClusterContext context) {
super.init(context);
context.getConfig().put(PROCESS_AUTO_RESTART, AutoRestartPolicy.UNEXPECTED.getValue());
+ LOGGER.info("init with foRestarts: {}", context.getClusterConfig().getMaxRestarts());
}
@Override
diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java
index 5fda040..f9f4a4b 100644
--- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java
+++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/entrypoint/KubernetesClientRunner.java
@@ -30,8 +30,10 @@
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.SleepUtils;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
+import com.antgroup.geaflow.env.args.EnvironmentArgumentParser;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,13 +52,22 @@
ClusterStartedCallback callback = ClusterCallbackFactory.createClusterStartCallback(config);
try {
System.setProperty(CLUSTER_TYPE, EnvType.K8S.name());
+
userClass = config.getString(USER_MAIN_CLASS);
+ LOGGER.info("execute mainClass {} to k8s, args: {}", userClass, classArgs);
+
+ EnvironmentArgumentParser parser = new EnvironmentArgumentParser();
+ Map<String, String> newConfig = parser.parse(new String[]{classArgs});
+ config.putAll(newConfig);
+
+ callback = ClusterCallbackFactory.createClusterStartCallback(config);
+ LOGGER.info("client callback: {}", callback.getClass().getCanonicalName());
+
Class<?> mainClazz = Thread.currentThread().getContextClassLoader().loadClass(userClass);
Method mainMethod = mainClazz.getMethod("main", String[].class);
- LOGGER.info("execute mainClass {} to k8s, params: {}", userClass, classArgs);
mainMethod.invoke(mainClazz, (Object) new String[] {classArgs});
} catch (Throwable e) {
- LOGGER.error("execute mainClass {} failed: {}", userClass, e.getMessage());
+ LOGGER.error("execute mainClass {} failed", userClass, e);
callback.onFailure(e);
throw new GeaflowRuntimeException(e);
} finally {