[Cluster] Fix BRPC endpoint conflict in local multi-container deployment (#548)
* [Cluster] Fix BRPC endpoint conflict in local multi-container deployment (#547)
* Fix BRPC endpoint registration conflict when launching multiple containers in local mode
* Add geaflow.executor.thread.max.multiple configuration for thread pool scaling
* [Cluster] Make DEFAULT_MAX_MULTIPLE private
* [Cluster] Fix Checkstyle violations in Javadoc comments
* [Cluster] Fix remove unused import
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java
index 8c3528d..2838829 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java
@@ -281,6 +281,11 @@
.noDefaultValue()
.description("container max heap size in mb");
+ public static final ConfigKey EXECUTOR_MAX_MULTIPLE = ConfigKeys
+ .key("geaflow.executor.thread.max.multiple")
+ .defaultValue(10)
+ .description("Maximum thread pool size multiplier (maxThreads = multiple * available cores)");
+
public static final ConfigKey FO_ENABLE = ConfigKeys
.key("geaflow.fo.enable")
.defaultValue(true)
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java
index b4fb0d5..d7d5122 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java
@@ -35,6 +35,7 @@
private static final int DEFAULT_KEEP_ALIVE_MINUTES = 30;
private static final int DEFAULT_QUEUE_CAPACITY = 1024;
private static final int DEFAULT_MAGNIFICATION = 2;
+ private static final int DEFAULT_MAX_MULTIPLE = 10;
private static final Map<String, ExecutorService> BOUNDED_EXECUTORS = new HashMap<>();
private static final Map<String, ExecutorService> UNBOUNDED_EXECUTORS = new HashMap<>();
@@ -90,38 +91,43 @@
TimeUnit.MINUTES);
}
- public static synchronized ExecutorService getExecutorService(int coreNumber,
- String threadFormat) {
- int maxThreads = 10 * CORE_NUM;
- Preconditions.checkArgument(coreNumber > 0 && coreNumber <= maxThreads,
- "executor threads should be smaller than " + maxThreads);
- Preconditions.checkArgument(StringUtils.isNotEmpty(threadFormat),
- "thread format couldn't" + " be empty");
- return getNamedService(coreNumber, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_MINUTES,
- TimeUnit.MINUTES, threadFormat, null);
+ public static ExecutorService getExecutorService(int coreNumber,
+ String threadFormat) {
+ return getExecutorService(DEFAULT_MAX_MULTIPLE, coreNumber, threadFormat, null);
}
- public static synchronized ExecutorService getExecutorService(int coreNumber,
+ public static ExecutorService getExecutorService(int coreNumber,
+ String threadFormat,
+ Thread.UncaughtExceptionHandler handler) {
+ return getExecutorService(DEFAULT_MAX_MULTIPLE, coreNumber, threadFormat, handler);
+ }
+
+ public static ExecutorService getExecutorService(int maxMultiple,
+ int coreNumber,
+ String threadFormat) {
+ return getExecutorService(maxMultiple, coreNumber, threadFormat, null);
+ }
+
+ /**
+ * Creates an ExecutorService with following params.
+ *
+ * @param maxMultiple Maximum threads multiplier
+ * @param coreNumber Number of core threads
+ * @param threadFormat Thread name format
+ * @param handler Uncaught exception handler
+ * @return Configured ExecutorService
+ */
+ public static synchronized ExecutorService getExecutorService(int maxMultiple,
+ int coreNumber,
String threadFormat,
Thread.UncaughtExceptionHandler handler) {
- int maxThreads = 10 * CORE_NUM;
- Preconditions.checkArgument(coreNumber > 0 && coreNumber <= maxThreads,
- "executor threads should be smaller than " + maxThreads);
- Preconditions.checkArgument(StringUtils.isNotEmpty(threadFormat),
- "thread format couldn't" + " be empty");
- return getNamedService(coreNumber, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_MINUTES,
- TimeUnit.MINUTES, threadFormat, handler);
- }
-
- public static synchronized ExecutorService getExecutorService(int maxMultiple, int coreNumber,
- String threadFormat) {
int maxThreads = maxMultiple * CORE_NUM;
Preconditions.checkArgument(coreNumber > 0 && coreNumber <= maxThreads,
"executor threads should be smaller than " + maxThreads);
Preconditions.checkArgument(StringUtils.isNotEmpty(threadFormat),
"thread format couldn't" + " be empty");
return getNamedService(coreNumber, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_MINUTES,
- TimeUnit.MINUTES, threadFormat, null);
+ TimeUnit.MINUTES, threadFormat, handler);
}
private static synchronized ExecutorService getNamedService(int bound, int capacity,
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java
index cea6445..8b9665b 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java
@@ -96,6 +96,7 @@
}
protected void startContainers(int containerNum) {
+ validateContainerNum(containerNum);
Map<Integer, String> containerIds = new HashMap<>();
for (int i = 0; i < containerNum; i++) {
int containerId = generateNextComponentId();
@@ -173,6 +174,9 @@
protected abstract IFailoverStrategy buildFailoverStrategy();
+ protected void validateContainerNum(int containerNum) {
+ }
+
@Override
public void doFailover(int componentId, Throwable cause) {
foStrategy.doFailover(componentId, cause);
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java
index cbf04dc..4b87d66 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java
@@ -29,12 +29,10 @@
private static final String EMITTER_FORMAT = "geaflow-emitter-%d";
private final int slots;
- private final Configuration configuration;
public EmitterService(int slots, Configuration configuration) {
- super(EMITTER_FORMAT);
+ super(configuration, EMITTER_FORMAT);
this.slots = slots;
- this.configuration = configuration;
}
protected EmitterRunner[] buildTaskRunner() {
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java
index 782169d..7eeb93b 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java
@@ -97,7 +97,7 @@
this.workerService = new TaskService(id, num,
configuration, metricGroup, fetcherService, emitterService);
this.dispatcher = new Dispatcher(workerService);
- this.dispatcherService = new DispatcherService(dispatcher);
+ this.dispatcherService = new DispatcherService(dispatcher, configuration);
// start task service
this.fetcherService.start();
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java
index 1878857..920965e 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java
@@ -33,12 +33,10 @@
private static final String FETCHER_FORMAT = "geaflow-fetcher-%d";
private int slots;
- private Configuration configuration;
public FetcherService(int slots, Configuration configuration) {
- super(FETCHER_FORMAT);
+ super(configuration, FETCHER_FORMAT);
this.slots = slots;
- this.configuration = configuration;
}
@Override
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java
index bb6246a..be87bf3 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java
@@ -21,6 +21,8 @@
import com.antgroup.geaflow.cluster.exception.ComponentUncaughtExceptionHandler;
import com.antgroup.geaflow.cluster.task.runner.ITaskRunner;
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.thread.Executors;
import com.antgroup.geaflow.common.utils.ExecutorUtil;
import com.google.common.base.Preconditions;
@@ -35,21 +37,32 @@
protected ExecutorService executorService;
private R[] tasks;
private String threadFormat;
+ protected final Configuration configuration;
- public AbstractTaskService(String threadFormat) {
+ public AbstractTaskService(Configuration configuration, String threadFormat) {
this.threadFormat = threadFormat;
+ this.configuration = configuration;
}
public void start() {
this.tasks = buildTaskRunner();
Preconditions.checkArgument(tasks != null && tasks.length != 0, "must specify at least one task");
- this.executorService = Executors.getExecutorService(tasks.length, threadFormat,
+ this.executorService = Executors.getExecutorService(getMaxMultiple(), tasks.length, threadFormat,
ComponentUncaughtExceptionHandler.INSTANCE);
for (int i = 0; i < tasks.length; i++) {
executorService.execute(tasks[i]);
}
}
+ /**
+ * Provides the maximum thread multiplier value.
+ *
+ * @return the maximum thread multiplier
+ */
+ protected int getMaxMultiple() {
+ return configuration.getInteger(ExecutionConfigKeys.EXECUTOR_MAX_MULTIPLE);
+ }
+
public void process(int workerId, TASK task) {
tasks[workerId].add(task);
}
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java
index d1313bf..1e2fa66 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java
@@ -37,17 +37,15 @@
private int containerId;
private int taskNum;
- private Configuration configuration;
private MetricGroup metricGroup;
private FetcherService fetcherService;
private EmitterService emitterService;
public TaskService(int containerId, int taskNum, Configuration configuration,
MetricGroup metricGroup, FetcherService fetcherService, EmitterService emitterService) {
- super(WORKER_FORMAT);
+ super(configuration, WORKER_FORMAT);
this.containerId = containerId;
this.taskNum = taskNum;
- this.configuration = configuration;
this.metricGroup = metricGroup;
this.fetcherService = fetcherService;
this.emitterService = emitterService;
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java
index 3f0005f..a7db49e 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java
@@ -21,6 +21,7 @@
import com.antgroup.geaflow.cluster.protocol.ICommand;
import com.antgroup.geaflow.cluster.task.service.AbstractTaskService;
+import com.antgroup.geaflow.common.config.Configuration;
public class DispatcherService extends AbstractTaskService<ICommand, Dispatcher> {
@@ -28,8 +29,8 @@
private Dispatcher dispatcher;
- public DispatcherService(Dispatcher dispatcher) {
- super(MESSAGE_FORMAT);
+ public DispatcherService(Dispatcher dispatcher, Configuration configuration) {
+ super(configuration, MESSAGE_FORMAT);
this.dispatcher = dispatcher;
}
diff --git a/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java b/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java
index d385640..f5ed310 100644
--- a/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java
+++ b/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java
@@ -85,6 +85,11 @@
}
@Override
+ protected void validateContainerNum(int containerNum) {
+ Preconditions.checkArgument(containerNum == 1, "local mode containerNum must equal with 1");
+ }
+
+ @Override
public void close() {
super.close();
if (appPath != null) {