[3.0] Fix Executor rewrite (#10478)
* [3.0] Fix Executor rewrite
* fix init
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index ebf880d..ef03daa 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -20,6 +20,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
@@ -30,6 +31,7 @@
import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
@@ -44,7 +46,7 @@
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
- private ExecutorService executor;
+ private Set<ExecutorService> executors = new ConcurrentHashSet<>();
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
@@ -72,7 +74,7 @@
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
- executor = executorRepository.createExecutorIfAbsent(url);
+ executors.add(executorRepository.createExecutorIfAbsent(url));
}
protected abstract void doOpen() throws Throwable;
@@ -96,6 +98,8 @@
logger.error(t.getMessage(), t);
}
+ ExecutorService executor = executorRepository.createExecutorIfAbsent(url);
+ executors.add(executor);
executorRepository.updateThreadpool(url, executor);
super.setUrl(getUrl().addParameters(url.getParameters()));
}
@@ -116,7 +120,9 @@
logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
- ExecutorUtil.shutdownNow(executor, 100);
+ for (ExecutorService executor : executors) {
+ ExecutorUtil.shutdownNow(executor, 100);
+ }
try {
super.close();
@@ -133,7 +139,9 @@
@Override
public void close(int timeout) {
- ExecutorUtil.gracefulShutdown(executor, timeout);
+ for (ExecutorService executor : executors) {
+ ExecutorUtil.gracefulShutdown(executor, timeout);
+ }
close();
}