Add unit tests for remote connection
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index d6f636b..a64c29e 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -34,6 +34,9 @@
 
     private int publicExecutorThreads = 4;
 
+    private int remotingShutdownQuietPeriodMillis = 2000;
+    private int remotingShutdownTimeoutMillis = 15000;
+
     public abstract int getOnewayInvokeSemaphore();
 
     public abstract int getAsyncInvokeSemaphore();
@@ -93,4 +96,20 @@
     public void setPublicExecutorThreads(final int publicExecutorThreads) {
         this.publicExecutorThreads = publicExecutorThreads;
     }
+
+    public int getRemotingShutdownQuietPeriodMillis() {
+        return remotingShutdownQuietPeriodMillis;
+    }
+
+    public void setRemotingShutdownQuietPeriodMillis(final int remotingShutdownQuietPeriodMillis) {
+        this.remotingShutdownQuietPeriodMillis = remotingShutdownQuietPeriodMillis;
+    }
+
+    public int getRemotingShutdownTimeoutMillis() {
+        return remotingShutdownTimeoutMillis;
+    }
+
+    public void setRemotingShutdownTimeoutMillis(final int remotingShutdownTimeoutMillis) {
+        this.remotingShutdownTimeoutMillis = remotingShutdownTimeoutMillis;
+    }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index c146813..9f0fd1c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -37,6 +37,7 @@
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
@@ -105,9 +106,11 @@
         try {
             clientChannelManager.clear();
 
-            this.ioGroup.shutdownGracefully();
+            this.ioGroup.shutdownGracefully(clientConfig.getRemotingShutdownQuietPeriodMillis(),
+                clientConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
 
-            this.workerGroup.shutdownGracefully();
+            this.workerGroup.shutdownGracefully(clientConfig.getRemotingShutdownQuietPeriodMillis(),
+                clientConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
         } catch (Exception e) {
             LOG.warn("RemotingClient stopped error !", e);
         }
@@ -183,6 +186,10 @@
         }
     }
 
+    public void setClientChannelManager(final ClientChannelManager clientChannelManager) {
+        this.clientChannelManager = clientChannelManager;
+    }
+
     private class ClientConnectionHandler extends ChannelDuplexHandler {
 
         @Override
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index 0967208..e83cd1d 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -39,6 +39,7 @@
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingServer;
 import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
@@ -119,11 +120,14 @@
     @Override
     public void stop() {
         try {
-            this.bossGroup.shutdownGracefully().syncUninterruptibly();
+            this.bossGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(),
+                serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
 
-            this.ioGroup.shutdownGracefully().syncUninterruptibly();
+            this.ioGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(),
+                serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
 
-            this.workerGroup.shutdownGracefully().syncUninterruptibly();
+            this.workerGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(),
+                serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
         } catch (Exception e) {
             LOG.warn("RemotingServer stopped error !", e);
         }
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index 47fd4bb..14e8c21 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -26,6 +26,8 @@
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
 import org.assertj.core.api.Fail;
@@ -82,7 +84,7 @@
     }
 
     protected void shouldNotReachHere() {
-        throw new RuntimeException("shouldn't reach here");
+        Fail.fail("Shouldn't reach here");
     }
 
     protected RemotingCommand randomRemotingCommand() {
@@ -137,6 +139,20 @@
         return objectFuture;
     }
 
+    protected RemotingClientConfig clientConfig() {
+        RemotingClientConfig clientConfig = new RemotingClientConfig();
+        clientConfig.setRemotingShutdownQuietPeriodMillis(0);
+        clientConfig.setRemotingShutdownTimeoutMillis(10);
+        return clientConfig;
+    }
+
+    protected RemotingServerConfig serverConfig() {
+        RemotingServerConfig serverConfig = new RemotingServerConfig();
+        serverConfig.setRemotingShutdownQuietPeriodMillis(0);
+        serverConfig.setRemotingShutdownTimeoutMillis(10);
+        return serverConfig;
+    }
+
     protected class ObjectFuture<T> {
         volatile private T object;
         private Semaphore semaphore;
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java
new file mode 100644
index 0000000..cdf1d18
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
+import org.apache.rocketmq.remoting.internal.JvmUtils;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class EpollRemoteConnectionTest extends BaseTest {
+    private static RemotingServer remotingServer;
+    private static RemotingClient remotingClient;
+
+    private static RemotingServer remotingEpollServer;
+    private static RemotingClient remotingEpollClient;
+
+    private static short requestCode = 123;
+    private RemotingCommand request;
+
+    private static String remoteAddr;
+    private static String remoteEpollAddr;
+
+
+    @Before
+    public void enableOnLinux() throws Exception {
+        Assume.assumeTrue(JvmUtils.isLinux());
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        RemotingClientConfig clientConfig = new RemotingClientConfig();
+        clientConfig.setRemotingShutdownQuietPeriodMillis(0);
+        clientConfig.setRemotingShutdownTimeoutMillis(10);
+
+        RemotingServerConfig serverConfig = new RemotingServerConfig();
+        serverConfig.setRemotingShutdownQuietPeriodMillis(0);
+        serverConfig.setRemotingShutdownTimeoutMillis(10);
+
+        RemotingClientConfig epollClientConfig = new RemotingClientConfig();
+        epollClientConfig.setClientNativeEpollEnable(true);
+        epollClientConfig.setRemotingShutdownQuietPeriodMillis(0);
+        epollClientConfig.setRemotingShutdownTimeoutMillis(10);
+
+        RemotingServerConfig epollServerConfig = new RemotingServerConfig();
+        epollServerConfig.setServerNativeEpollEnable(true);
+        epollServerConfig.setServerListenPort(9999);
+        epollServerConfig.setRemotingShutdownQuietPeriodMillis(0);
+        epollServerConfig.setRemotingShutdownTimeoutMillis(10);
+
+        remotingClient = new NettyRemotingClient(clientConfig);
+        remotingServer = new NettyRemotingServer(serverConfig);
+
+        remotingEpollServer = new NettyRemotingServer(epollServerConfig);
+        remotingEpollClient = new NettyRemotingClient(epollClientConfig);
+
+        remotingServer.registerRequestProcessor(requestCode, new RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+                RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+                response.payload("Pong".getBytes());
+                return response;
+            }
+        });
+
+        remotingEpollServer.registerRequestProcessor(requestCode, new RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+                RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+                response.payload("Pong".getBytes());
+                return response;
+            }
+        });
+
+        remotingServer.start();
+        remotingClient.start();
+        remotingEpollClient.start();
+        remotingEpollServer.start();
+
+        remoteAddr = "127.0.0.1:" + remotingServer.localListenPort();
+        remoteEpollAddr = "127.0.0.1:" + remotingEpollServer.localListenPort();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        remotingClient.stop();
+        remotingServer.stop();
+        remotingEpollServer.stop();
+        remotingEpollClient.stop();
+    }
+
+    public RemotingCommand requestCommand() {
+        request = remotingClient.commandFactory().createRequest();
+        request.cmdCode(requestCode);
+        return request;
+    }
+
+    @Test
+    public void invokeToServer_Success() {
+        // Client to epoll server
+        RemotingCommand rsp = remotingClient.invoke(remoteEpollAddr, requestCommand(), 3000);
+        assertThat(new String(rsp.payload())).isEqualTo("Pong");
+
+        // Epoll client to server
+        rsp = remotingEpollClient.invoke(remoteAddr, requestCommand(), 3000);
+        assertThat(new String(rsp.payload())).isEqualTo("Pong");
+
+        // Epoll client to epoll server
+        rsp = remotingEpollClient.invoke(remoteEpollAddr, requestCommand(), 3000);
+        assertThat(new String(rsp.payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void invokeAsyncToServer_Success() {
+        // Client to epoll server
+
+        final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000);
+
+        remotingClient.invokeAsync(remoteEpollAddr, requestCommand(), new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+                objectFuture.putObject(response);
+                objectFuture.release();
+            }
+        }, 3000);
+
+        assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+
+        // Epoll client to server
+        remotingEpollClient.invokeAsync(remoteAddr, requestCommand(), new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+                objectFuture.putObject(response);
+                objectFuture.release();
+            }
+        }, 3000);
+
+        assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+
+        // Epoll client to epoll server
+        remotingEpollClient.invokeAsync(remoteEpollAddr, requestCommand(), new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+                objectFuture.putObject(response);
+                objectFuture.release();
+            }
+        }, 3000);
+
+        assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void invokeOnewayToServer_Success() {
+        final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000);
+        final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000);
+
+        Interceptor interceptor = new Interceptor() {
+            @Override
+            public void beforeRequest(final RequestContext context) {
+                requestFuture.putObject(context.getRequest());
+                requestFuture.release();
+            }
+
+            @Override
+            public void afterResponseReceived(final ResponseContext context) {
+                responseFuture.putObject(context.getResponse());
+                responseFuture.release();
+            }
+        };
+
+        remotingServer.registerInterceptor(interceptor);
+        remotingEpollServer.registerInterceptor(interceptor);
+
+        // Client to epoll server
+        remotingClient.invokeOneWay(remoteEpollAddr, request);
+
+        assertThat(requestFuture.getObject()).isEqualTo(request);
+        assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+
+        // Epoll client to server
+        remotingEpollClient.invokeOneWay(remoteAddr, request);
+
+        assertThat(requestFuture.getObject()).isEqualTo(request);
+        assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+
+        // Epoll client to epoll server
+        remotingEpollClient.invokeOneWay(remoteEpollAddr, request);
+
+        assertThat(requestFuture.getObject()).isEqualTo(request);
+        assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+    }
+}
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java
new file mode 100644
index 0000000..d12a5a3
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class NettyRemoteConnectionTest extends BaseTest {
+    private static RemotingServer remotingServer;
+    private static RemotingClient remotingClient;
+
+    private static short requestCode = 123;
+    private RemotingCommand request;
+
+    private static String remoteAddr;
+    private static RemotingChannel channelInServer;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        RemotingClientConfig clientConfig = new RemotingClientConfig();
+        clientConfig.setRemotingShutdownQuietPeriodMillis(0);
+        clientConfig.setRemotingShutdownTimeoutMillis(10);
+
+        RemotingServerConfig serverConfig = new RemotingServerConfig();
+        serverConfig.setRemotingShutdownQuietPeriodMillis(0);
+        serverConfig.setRemotingShutdownTimeoutMillis(10);
+
+        remotingClient = new NettyRemotingClient(clientConfig);
+        remotingServer = new NettyRemotingServer(serverConfig);
+
+        remotingServer.registerRequestProcessor(requestCode, new RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+                RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+                response.payload("Pong".getBytes());
+                return response;
+            }
+        });
+
+        remotingClient.registerRequestProcessor(requestCode, new RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+                RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+                response.payload("ClientPong".getBytes());
+                return response;
+            }
+        });
+
+        remotingServer.registerChannelEventListener(new ChannelEventListener() {
+            @Override
+            public void onChannelConnect(final RemotingChannel channel) {
+                channelInServer = channel;
+            }
+
+            @Override
+            public void onChannelClose(final RemotingChannel channel) {
+                channelInServer = null;
+            }
+
+            @Override
+            public void onChannelException(final RemotingChannel channel, final Throwable cause) {
+                channelInServer = null;
+            }
+
+            @Override
+            public void onChannelIdle(final RemotingChannel channel) {
+                channelInServer = null;
+            }
+        });
+
+        remotingServer.start();
+        remotingClient.start();
+
+        remoteAddr = "127.0.0.1:" + remotingServer.localListenPort();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        remotingClient.stop();
+        remotingServer.stop();
+    }
+
+    @Before
+    public void setUp0() throws Exception {
+        request = remotingClient.commandFactory().createRequest();
+        request.cmdCode(requestCode);
+
+        if (channelInServer == null) {
+            RemotingCommand rsp = remotingClient.invoke(remoteAddr, request, 3000);
+            assertThat(new String(rsp.payload())).isEqualTo("Pong");
+
+            // Refresh the command
+            request = remotingClient.commandFactory().createRequest();
+            request.cmdCode(requestCode);
+        }
+    }
+
+    @Test
+    public void invokeToServer_Success() {
+        RemotingCommand rsp = remotingClient.invoke(remoteAddr, request, 3000);
+        assertThat(new String(rsp.payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void invokeAsyncToServer_Success() {
+        final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000);
+
+        remotingClient.invokeAsync(remoteAddr, request, new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+                objectFuture.putObject(response);
+                objectFuture.release();
+            }
+        }, 3000);
+
+        assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void invokeOnewayToServer_Success() {
+        final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000);
+        final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000);
+
+        remotingServer.registerInterceptor(new Interceptor() {
+            @Override
+            public void beforeRequest(final RequestContext context) {
+                requestFuture.putObject(context.getRequest());
+                requestFuture.release();
+            }
+
+            @Override
+            public void afterResponseReceived(final ResponseContext context) {
+                responseFuture.putObject(context.getResponse());
+                responseFuture.release();
+            }
+        });
+
+        remotingClient.invokeOneWay(remoteAddr, request);
+
+        assertThat(requestFuture.getObject()).isEqualTo(request);
+        assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+    }
+
+    @Test
+    public void invokeToClient_Success() {
+        RemotingCommand rsp = remotingServer.invoke(channelInServer, request, 3000);
+        assertThat(new String(rsp.payload())).isEqualTo("ClientPong");
+    }
+
+    @Test
+    public void invokeAsyncToClient_Success() {
+        final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000);
+
+        remotingServer.invokeAsync(channelInServer, request, new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+                objectFuture.putObject(response);
+                objectFuture.release();
+            }
+        }, 3000);
+
+        assertThat(new String(objectFuture.getObject().payload())).isEqualTo("ClientPong");
+    }
+
+    @Test
+    public void invokeOnewayToClient_Success() {
+        final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000);
+        final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000);
+
+        remotingClient.registerInterceptor(new Interceptor() {
+            @Override
+            public void beforeRequest(final RequestContext context) {
+                requestFuture.putObject(context.getRequest());
+                requestFuture.release();
+            }
+
+            @Override
+            public void afterResponseReceived(final ResponseContext context) {
+                responseFuture.putObject(context.getResponse());
+                responseFuture.release();
+            }
+        });
+
+        remotingServer.invokeOneWay(channelInServer, request);
+
+        assertThat(requestFuture.getObject()).isEqualTo(request);
+        assertThat(new String(responseFuture.getObject().payload())).isEqualTo("ClientPong");
+    }
+}
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java
new file mode 100644
index 0000000..aee21fb
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyRemotingClientTest extends BaseTest {
+    @Spy
+    private NettyRemotingClient remotingClient = new NettyRemotingClient(clientConfig());
+
+    @Mock
+    private ClientChannelManager channelManager;
+
+    @Mock
+    private Channel mockedChannel;
+
+    @Before
+    public void setUp() {
+        remotingClient.start();
+        remotingClient.setClientChannelManager(channelManager);
+        when(channelManager.createIfAbsent(any(String.class))).thenReturn(mockedChannel);
+        when(mockedChannel.isActive()).thenReturn(true);
+    }
+
+    @After
+    public void tearDown() {
+        remotingClient.stop();
+    }
+
+    @Test
+    public void invoke_Success() {
+        RemotingCommand request = remotingClient.commandFactory().createRequest();
+        final RemotingCommand response = remotingClient.commandFactory().createResponse(request);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                return response;
+            }
+        }).when(remotingClient).invokeWithInterceptor(mockedChannel, request, 3000);
+
+        RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000);
+
+        assertThat(response).isEqualTo(returnedResp);
+    }
+
+    @Test
+    public void invoke_ConnectFailureException() {
+        RemotingCommand request = remotingClient.commandFactory().createRequest();
+        when(mockedChannel.isActive()).thenReturn(false);
+
+        try {
+            RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000);
+            failBecauseExceptionWasNotThrown(RemoteConnectFailureException.class);
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(RemoteConnectFailureException.class);
+        }
+    }
+
+    @Test
+    public void invoke_TimeoutException() {
+        RemotingCommand request = remotingClient.commandFactory().createRequest();
+
+        doThrow(new RemoteTimeoutException("Timeout exception occurred")).when(remotingClient).invokeWithInterceptor(mockedChannel, request, 3000);
+
+        try {
+            RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000);
+            failBecauseExceptionWasNotThrown(RemoteTimeoutException.class);
+        } catch (Exception e) {
+            assertThat(e).isInstanceOf(RemoteTimeoutException.class);
+        }
+    }
+
+    @Test
+    public void invokeAsync_Success() {
+        RemotingCommand request = remotingClient.commandFactory().createRequest();
+        final RemotingCommand response = remotingClient.commandFactory().createResponse(request);
+
+        final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 100);
+
+        final AsyncHandler asyncHandler = new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+                objectFuture.putObject(response);
+                objectFuture.release();
+            }
+        };
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws Throwable {
+                asyncHandler.onSuccess(response);
+                return null;
+            }
+        }).when(remotingClient).invokeAsyncWithInterceptor(mockedChannel, request, asyncHandler,3000);
+
+        remotingClient.invokeAsync("127.0.0.1:10911", request, asyncHandler, 3000);
+
+        assertThat(objectFuture.getObject()).isEqualTo(response);
+
+    }
+}
\ No newline at end of file