Use ExecutorService instead of event loop for Netty connection (#13904)
* Use ExecutorService instead of event loop for Netty connection
This change addresses the issue of the event loop being blocked for an extended period, improving overall performance and responsiveness.
* fix log getConnectAddress
* applay format
* applay format
* applay format
* applay format
* applay format
* Use an independent ExecutorService
* Use ScheduledExecutorService for scheduling tasks
* delete unnecessary files
* Modify to stop ExecutorService using shutdownNow
* Modify to use ScheduledExecutor managed by FrameworkExecutorRepository
* Synchronize reconnectDuration with HeaderExchangeClient's reconnectDuration
* get framework model directly
use awaitility
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 7b465c6..7be50ff 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -21,6 +21,7 @@
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
@@ -28,9 +29,11 @@
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
+import org.apache.dubbo.rpc.model.FrameworkModel;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -41,6 +44,11 @@
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
+import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
+import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION_KEY;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
/**
* AbstractClient
@@ -55,13 +63,23 @@
protected volatile ExecutorService executor;
+ protected volatile ScheduledExecutorService connectivityExecutor;
+
+ private FrameworkModel frameworkModel;
+
+ protected long reconnectDuaration;
+
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// set default needReconnect true when channel is not connected
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
+ frameworkModel = url.getOrDefaultFrameworkModel();
+
initExecutor(url);
+ reconnectDuaration = getReconnectDuration(url);
+
try {
doOpen();
} catch (Throwable t) {
@@ -134,6 +152,11 @@
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
+
+ connectivityExecutor = frameworkModel
+ .getBeanFactory()
+ .getBean(FrameworkExecutorRepository.class)
+ .getConnectivityScheduledExecutor();
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
@@ -296,6 +319,25 @@
}
}
+ private long getReconnectDuration(URL url) {
+ int idleTimeout = getIdleTimeout(url);
+ long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
+ return calculateReconnectDuration(url, heartbeatTimeoutTick);
+ }
+
+ private long calculateLeastDuration(int time) {
+ if (time / HEARTBEAT_CHECK_TICK <= 0) {
+ return LEAST_HEARTBEAT_DURATION;
+ } else {
+ return time / HEARTBEAT_CHECK_TICK;
+ }
+ }
+
+ private long calculateReconnectDuration(URL url, long tick) {
+ long leastReconnectDuration = url.getParameter(LEAST_RECONNECT_DURATION_KEY, LEAST_RECONNECT_DURATION);
+ return Math.max(leastReconnectDuration, tick);
+ }
+
@Override
public void reconnect() throws RemotingException {
connectLock.lock();
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index ab8f39b..85cd565 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -43,7 +43,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
@@ -75,6 +74,8 @@
public static final AttributeKey<AbstractConnectionClient> CONNECTION = AttributeKey.valueOf("connection");
+ private AtomicBoolean isReconnecting;
+
public NettyConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}
@@ -91,6 +92,7 @@
this.closePromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
this.init = new AtomicBoolean(false);
this.increase();
+ this.isReconnecting = new AtomicBoolean(false);
}
@Override
@@ -158,6 +160,10 @@
@Override
protected void doConnect() throws RemotingException {
+ if (!isReconnecting.compareAndSet(false, true)) {
+ return;
+ }
+
if (isClosed()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -347,6 +353,11 @@
@Override
public void operationComplete(ChannelFuture future) {
+
+ if (!isReconnecting.compareAndSet(true, false)) {
+ return;
+ }
+
if (future.isSuccess()) {
return;
}
@@ -364,8 +375,8 @@
"%s is reconnecting, attempt=%d cause=%s",
connectionClient, 0, future.cause().getMessage()));
}
- final EventLoop loop = future.channel().eventLoop();
- loop.schedule(
+
+ connectivityExecutor.schedule(
() -> {
try {
connectionClient.doConnect();
@@ -377,8 +388,8 @@
"Failed to connect to server: " + getConnectAddress());
}
},
- 1L,
- TimeUnit.SECONDS);
+ reconnectDuaration,
+ TimeUnit.MILLISECONDS);
}
}
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
index 37b4439..c90418f 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
@@ -28,6 +28,7 @@
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -39,6 +40,7 @@
import org.junit.jupiter.api.Test;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+import static org.awaitility.Awaitility.await;
public class ConnectionTest {
@@ -138,6 +140,7 @@
nettyPortUnificationServer.bind();
// auto reconnect
+ await().atMost(Duration.ofSeconds(100)).until(() -> connectionClient.isAvailable());
Assertions.assertTrue(connectionClient.isAvailable());
connectionClient.close();