Use ExecutorService instead of event loop for Netty connection (#13904)

* Use ExecutorService instead of event loop for Netty connection

This change addresses the issue of the event loop being blocked for an extended period, improving overall performance and responsiveness.

* fix log getConnectAddress

* applay format

* applay format

* applay format

* applay format

* applay format

* Use an independent ExecutorService

* Use ScheduledExecutorService for scheduling tasks

* delete unnecessary files

* Modify to stop ExecutorService using shutdownNow

* Modify to use ScheduledExecutor managed by FrameworkExecutorRepository

* Synchronize reconnectDuration with HeaderExchangeClient's reconnectDuration

* get framework model directly

use awaitility
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 7b465c6..7be50ff 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -21,6 +21,7 @@
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
@@ -28,9 +29,11 @@
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
+import org.apache.dubbo.rpc.model.FrameworkModel;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -41,6 +44,11 @@
 import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
 import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
 import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
+import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
+import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION_KEY;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
 
 /**
  * AbstractClient
@@ -55,13 +63,23 @@
 
     protected volatile ExecutorService executor;
 
+    protected volatile ScheduledExecutorService connectivityExecutor;
+
+    private FrameworkModel frameworkModel;
+
+    protected long reconnectDuaration;
+
     public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
         super(url, handler);
         // set default needReconnect true when channel is not connected
         needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
 
+        frameworkModel = url.getOrDefaultFrameworkModel();
+
         initExecutor(url);
 
+        reconnectDuaration = getReconnectDuration(url);
+
         try {
             doOpen();
         } catch (Throwable t) {
@@ -134,6 +152,11 @@
         url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
                 .addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
         executor = executorRepository.createExecutorIfAbsent(url);
+
+        connectivityExecutor = frameworkModel
+                .getBeanFactory()
+                .getBean(FrameworkExecutorRepository.class)
+                .getConnectivityScheduledExecutor();
     }
 
     protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
@@ -296,6 +319,25 @@
         }
     }
 
+    private long getReconnectDuration(URL url) {
+        int idleTimeout = getIdleTimeout(url);
+        long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
+        return calculateReconnectDuration(url, heartbeatTimeoutTick);
+    }
+
+    private long calculateLeastDuration(int time) {
+        if (time / HEARTBEAT_CHECK_TICK <= 0) {
+            return LEAST_HEARTBEAT_DURATION;
+        } else {
+            return time / HEARTBEAT_CHECK_TICK;
+        }
+    }
+
+    private long calculateReconnectDuration(URL url, long tick) {
+        long leastReconnectDuration = url.getParameter(LEAST_RECONNECT_DURATION_KEY, LEAST_RECONNECT_DURATION);
+        return Math.max(leastReconnectDuration, tick);
+    }
+
     @Override
     public void reconnect() throws RemotingException {
         connectLock.lock();
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index ab8f39b..85cd565 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -43,7 +43,6 @@
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoop;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.timeout.IdleStateHandler;
@@ -75,6 +74,8 @@
 
     public static final AttributeKey<AbstractConnectionClient> CONNECTION = AttributeKey.valueOf("connection");
 
+    private AtomicBoolean isReconnecting;
+
     public NettyConnectionClient(URL url, ChannelHandler handler) throws RemotingException {
         super(url, handler);
     }
@@ -91,6 +92,7 @@
         this.closePromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
         this.init = new AtomicBoolean(false);
         this.increase();
+        this.isReconnecting = new AtomicBoolean(false);
     }
 
     @Override
@@ -158,6 +160,10 @@
 
     @Override
     protected void doConnect() throws RemotingException {
+        if (!isReconnecting.compareAndSet(false, true)) {
+            return;
+        }
+
         if (isClosed()) {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(
@@ -347,6 +353,11 @@
 
         @Override
         public void operationComplete(ChannelFuture future) {
+
+            if (!isReconnecting.compareAndSet(true, false)) {
+                return;
+            }
+
             if (future.isSuccess()) {
                 return;
             }
@@ -364,8 +375,8 @@
                         "%s is reconnecting, attempt=%d cause=%s",
                         connectionClient, 0, future.cause().getMessage()));
             }
-            final EventLoop loop = future.channel().eventLoop();
-            loop.schedule(
+
+            connectivityExecutor.schedule(
                     () -> {
                         try {
                             connectionClient.doConnect();
@@ -377,8 +388,8 @@
                                     "Failed to connect to server: " + getConnectAddress());
                         }
                     },
-                    1L,
-                    TimeUnit.SECONDS);
+                    reconnectDuaration,
+                    TimeUnit.MILLISECONDS);
         }
     }
 }
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
index 37b4439..c90418f 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
@@ -28,6 +28,7 @@
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ModuleModel;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,6 +40,7 @@
 import org.junit.jupiter.api.Test;
 
 import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+import static org.awaitility.Awaitility.await;
 
 public class ConnectionTest {
 
@@ -138,6 +140,7 @@
 
         nettyPortUnificationServer.bind();
         // auto reconnect
+        await().atMost(Duration.ofSeconds(100)).until(() -> connectionClient.isAvailable());
         Assertions.assertTrue(connectionClient.isAvailable());
 
         connectionClient.close();