[Cluster] Fix BRPC endpoint conflict in local multi-container deployment (#548)

* [Cluster] Fix BRPC endpoint conflict in local multi-container deployment (#547)

* Fix BRPC endpoint registration conflict when launching multiple containers in local mode
* Add geaflow.executor.thread.max.multiple configuration for thread pool scaling

* [Cluster] Make DEFAULT_MAX_MULTIPLE private

* [Cluster] Fix Checkstyle violations in Javadoc comments

* [Cluster] Fix remove unused import
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java
index 8c3528d..2838829 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ExecutionConfigKeys.java
@@ -281,6 +281,11 @@
         .noDefaultValue()
         .description("container max heap size in mb");
 
+    public static final ConfigKey EXECUTOR_MAX_MULTIPLE = ConfigKeys
+            .key("geaflow.executor.thread.max.multiple")
+            .defaultValue(10)
+            .description("Maximum thread pool size multiplier (maxThreads = multiple * available cores)");
+
     public static final ConfigKey FO_ENABLE = ConfigKeys
         .key("geaflow.fo.enable")
         .defaultValue(true)
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java
index b4fb0d5..d7d5122 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/thread/Executors.java
@@ -35,6 +35,7 @@
     private static final int DEFAULT_KEEP_ALIVE_MINUTES = 30;
     private static final int DEFAULT_QUEUE_CAPACITY = 1024;
     private static final int DEFAULT_MAGNIFICATION = 2;
+    private static final int DEFAULT_MAX_MULTIPLE = 10;
 
     private static final Map<String, ExecutorService> BOUNDED_EXECUTORS = new HashMap<>();
     private static final Map<String, ExecutorService> UNBOUNDED_EXECUTORS = new HashMap<>();
@@ -90,38 +91,43 @@
             TimeUnit.MINUTES);
     }
 
-    public static synchronized ExecutorService getExecutorService(int coreNumber,
-                                                                  String threadFormat) {
-        int maxThreads = 10 * CORE_NUM;
-        Preconditions.checkArgument(coreNumber > 0 && coreNumber <= maxThreads,
-            "executor threads should be smaller than " + maxThreads);
-        Preconditions.checkArgument(StringUtils.isNotEmpty(threadFormat),
-            "thread format couldn't" + " be empty");
-        return getNamedService(coreNumber, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_MINUTES,
-            TimeUnit.MINUTES, threadFormat, null);
+    public static ExecutorService getExecutorService(int coreNumber,
+                                                     String threadFormat) {
+        return getExecutorService(DEFAULT_MAX_MULTIPLE, coreNumber, threadFormat, null);
     }
 
-    public static synchronized ExecutorService getExecutorService(int coreNumber,
+    public static ExecutorService getExecutorService(int coreNumber,
+                                                     String threadFormat,
+                                                     Thread.UncaughtExceptionHandler handler) {
+        return getExecutorService(DEFAULT_MAX_MULTIPLE, coreNumber, threadFormat, handler);
+    }
+
+    public static ExecutorService getExecutorService(int maxMultiple,
+                                                     int coreNumber,
+                                                     String threadFormat) {
+        return getExecutorService(maxMultiple, coreNumber, threadFormat, null);
+    }
+
+    /**
+     * Creates an ExecutorService with following params.
+     *
+     * @param maxMultiple Maximum threads multiplier
+     * @param coreNumber Number of core threads
+     * @param threadFormat Thread name format
+     * @param handler Uncaught exception handler
+     * @return Configured ExecutorService
+     */
+    public static synchronized ExecutorService getExecutorService(int maxMultiple,
+                                                                  int coreNumber,
                                                                   String threadFormat,
                                                                   Thread.UncaughtExceptionHandler handler) {
-        int maxThreads = 10 * CORE_NUM;
-        Preconditions.checkArgument(coreNumber > 0 && coreNumber <= maxThreads,
-            "executor threads should be smaller than " + maxThreads);
-        Preconditions.checkArgument(StringUtils.isNotEmpty(threadFormat),
-            "thread format couldn't" + " be empty");
-        return getNamedService(coreNumber, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_MINUTES,
-            TimeUnit.MINUTES, threadFormat, handler);
-    }
-
-    public static synchronized ExecutorService getExecutorService(int maxMultiple, int coreNumber,
-                                                                  String threadFormat) {
         int maxThreads = maxMultiple * CORE_NUM;
         Preconditions.checkArgument(coreNumber > 0 && coreNumber <= maxThreads,
             "executor threads should be smaller than " + maxThreads);
         Preconditions.checkArgument(StringUtils.isNotEmpty(threadFormat),
             "thread format couldn't" + " be empty");
         return getNamedService(coreNumber, Integer.MAX_VALUE, DEFAULT_KEEP_ALIVE_MINUTES,
-            TimeUnit.MINUTES, threadFormat, null);
+            TimeUnit.MINUTES, threadFormat, handler);
     }
 
     private static synchronized ExecutorService getNamedService(int bound, int capacity,
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java
index cea6445..8b9665b 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java
@@ -96,6 +96,7 @@
     }
 
     protected void startContainers(int containerNum) {
+        validateContainerNum(containerNum);
         Map<Integer, String> containerIds = new HashMap<>();
         for (int i = 0; i < containerNum; i++) {
             int containerId = generateNextComponentId();
@@ -173,6 +174,9 @@
 
     protected abstract IFailoverStrategy buildFailoverStrategy();
 
+    protected void validateContainerNum(int containerNum) {
+    }
+
     @Override
     public void doFailover(int componentId, Throwable cause) {
         foStrategy.doFailover(componentId, cause);
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java
index cbf04dc..4b87d66 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/collector/EmitterService.java
@@ -29,12 +29,10 @@
     private static final String EMITTER_FORMAT = "geaflow-emitter-%d";
 
     private final int slots;
-    private final Configuration configuration;
 
     public EmitterService(int slots, Configuration configuration) {
-        super(EMITTER_FORMAT);
+        super(configuration, EMITTER_FORMAT);
         this.slots = slots;
-        this.configuration = configuration;
     }
 
     protected EmitterRunner[] buildTaskRunner() {
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java
index 782169d..7eeb93b 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/container/Container.java
@@ -97,7 +97,7 @@
                 this.workerService = new TaskService(id, num,
                     configuration, metricGroup, fetcherService, emitterService);
                 this.dispatcher = new Dispatcher(workerService);
-                this.dispatcherService = new DispatcherService(dispatcher);
+                this.dispatcherService = new DispatcherService(dispatcher, configuration);
 
                 // start task service
                 this.fetcherService.start();
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java
index 1878857..920965e 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/fetcher/FetcherService.java
@@ -33,12 +33,10 @@
     private static final String FETCHER_FORMAT = "geaflow-fetcher-%d";
 
     private int slots;
-    private Configuration configuration;
 
     public FetcherService(int slots, Configuration configuration) {
-        super(FETCHER_FORMAT);
+        super(configuration, FETCHER_FORMAT);
         this.slots = slots;
-        this.configuration = configuration;
     }
 
     @Override
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java
index bb6246a..be87bf3 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/AbstractTaskService.java
@@ -21,6 +21,8 @@
 
 import com.antgroup.geaflow.cluster.exception.ComponentUncaughtExceptionHandler;
 import com.antgroup.geaflow.cluster.task.runner.ITaskRunner;
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
 import com.antgroup.geaflow.common.thread.Executors;
 import com.antgroup.geaflow.common.utils.ExecutorUtil;
 import com.google.common.base.Preconditions;
@@ -35,21 +37,32 @@
     protected ExecutorService executorService;
     private R[] tasks;
     private String threadFormat;
+    protected final Configuration configuration;
 
-    public AbstractTaskService(String threadFormat) {
+    public AbstractTaskService(Configuration configuration, String threadFormat) {
         this.threadFormat = threadFormat;
+        this.configuration = configuration;
     }
 
     public void start() {
         this.tasks = buildTaskRunner();
         Preconditions.checkArgument(tasks != null && tasks.length != 0, "must specify at least one task");
-        this.executorService = Executors.getExecutorService(tasks.length, threadFormat,
+        this.executorService = Executors.getExecutorService(getMaxMultiple(), tasks.length, threadFormat,
             ComponentUncaughtExceptionHandler.INSTANCE);
         for (int i = 0; i < tasks.length; i++) {
             executorService.execute(tasks[i]);
         }
     }
 
+    /**
+     * Provides the maximum thread multiplier value.
+     *
+     * @return the maximum thread multiplier
+     */
+    protected int getMaxMultiple() {
+        return configuration.getInteger(ExecutionConfigKeys.EXECUTOR_MAX_MULTIPLE);
+    }
+
     public void process(int workerId, TASK task) {
         tasks[workerId].add(task);
     }
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java
index d1313bf..1e2fa66 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/task/service/TaskService.java
@@ -37,17 +37,15 @@
 
     private int containerId;
     private int taskNum;
-    private Configuration configuration;
     private MetricGroup metricGroup;
     private FetcherService fetcherService;
     private EmitterService emitterService;
 
     public TaskService(int containerId, int taskNum, Configuration configuration,
                        MetricGroup metricGroup, FetcherService fetcherService, EmitterService emitterService) {
-        super(WORKER_FORMAT);
+        super(configuration, WORKER_FORMAT);
         this.containerId = containerId;
         this.taskNum = taskNum;
-        this.configuration = configuration;
         this.metricGroup = metricGroup;
         this.fetcherService = fetcherService;
         this.emitterService = emitterService;
diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java
index 3f0005f..a7db49e 100644
--- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java
+++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/worker/DispatcherService.java
@@ -21,6 +21,7 @@
 
 import com.antgroup.geaflow.cluster.protocol.ICommand;
 import com.antgroup.geaflow.cluster.task.service.AbstractTaskService;
+import com.antgroup.geaflow.common.config.Configuration;
 
 public class DispatcherService extends AbstractTaskService<ICommand, Dispatcher> {
 
@@ -28,8 +29,8 @@
 
     private Dispatcher dispatcher;
 
-    public DispatcherService(Dispatcher dispatcher) {
-        super(MESSAGE_FORMAT);
+    public DispatcherService(Dispatcher dispatcher, Configuration configuration) {
+        super(configuration, MESSAGE_FORMAT);
         this.dispatcher = dispatcher;
     }
 
diff --git a/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java b/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java
index d385640..f5ed310 100644
--- a/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java
+++ b/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java
@@ -85,6 +85,11 @@
     }
 
     @Override
+    protected void validateContainerNum(int containerNum) {
+        Preconditions.checkArgument(containerNum == 1, "local mode containerNum must equal with 1");
+    }
+
+    @Override
     public void close() {
         super.close();
         if (appPath != null) {