Add unit tests for remote connection
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index d6f636b..a64c29e 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -34,6 +34,9 @@
private int publicExecutorThreads = 4;
+ private int remotingShutdownQuietPeriodMillis = 2000;
+ private int remotingShutdownTimeoutMillis = 15000;
+
public abstract int getOnewayInvokeSemaphore();
public abstract int getAsyncInvokeSemaphore();
@@ -93,4 +96,20 @@
public void setPublicExecutorThreads(final int publicExecutorThreads) {
this.publicExecutorThreads = publicExecutorThreads;
}
+
+ public int getRemotingShutdownQuietPeriodMillis() {
+ return remotingShutdownQuietPeriodMillis;
+ }
+
+ public void setRemotingShutdownQuietPeriodMillis(final int remotingShutdownQuietPeriodMillis) {
+ this.remotingShutdownQuietPeriodMillis = remotingShutdownQuietPeriodMillis;
+ }
+
+ public int getRemotingShutdownTimeoutMillis() {
+ return remotingShutdownTimeoutMillis;
+ }
+
+ public void setRemotingShutdownTimeoutMillis(final int remotingShutdownTimeoutMillis) {
+ this.remotingShutdownTimeoutMillis = remotingShutdownTimeoutMillis;
+ }
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index c146813..9f0fd1c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -37,6 +37,7 @@
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
@@ -105,9 +106,11 @@
try {
clientChannelManager.clear();
- this.ioGroup.shutdownGracefully();
+ this.ioGroup.shutdownGracefully(clientConfig.getRemotingShutdownQuietPeriodMillis(),
+ clientConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
- this.workerGroup.shutdownGracefully();
+ this.workerGroup.shutdownGracefully(clientConfig.getRemotingShutdownQuietPeriodMillis(),
+ clientConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
} catch (Exception e) {
LOG.warn("RemotingClient stopped error !", e);
}
@@ -183,6 +186,10 @@
}
}
+ public void setClientChannelManager(final ClientChannelManager clientChannelManager) {
+ this.clientChannelManager = clientChannelManager;
+ }
+
private class ClientConnectionHandler extends ChannelDuplexHandler {
@Override
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index 0967208..e83cd1d 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -39,6 +39,7 @@
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingServer;
import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
@@ -119,11 +120,14 @@
@Override
public void stop() {
try {
- this.bossGroup.shutdownGracefully().syncUninterruptibly();
+ this.bossGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(),
+ serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
- this.ioGroup.shutdownGracefully().syncUninterruptibly();
+ this.ioGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(),
+ serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
- this.workerGroup.shutdownGracefully().syncUninterruptibly();
+ this.workerGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(),
+ serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync();
} catch (Exception e) {
LOG.warn("RemotingServer stopped error !", e);
}
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index 47fd4bb..14e8c21 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -26,6 +26,8 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
import org.assertj.core.api.Fail;
@@ -82,7 +84,7 @@
}
protected void shouldNotReachHere() {
- throw new RuntimeException("shouldn't reach here");
+ Fail.fail("Shouldn't reach here");
}
protected RemotingCommand randomRemotingCommand() {
@@ -137,6 +139,20 @@
return objectFuture;
}
+ protected RemotingClientConfig clientConfig() {
+ RemotingClientConfig clientConfig = new RemotingClientConfig();
+ clientConfig.setRemotingShutdownQuietPeriodMillis(0);
+ clientConfig.setRemotingShutdownTimeoutMillis(10);
+ return clientConfig;
+ }
+
+ protected RemotingServerConfig serverConfig() {
+ RemotingServerConfig serverConfig = new RemotingServerConfig();
+ serverConfig.setRemotingShutdownQuietPeriodMillis(0);
+ serverConfig.setRemotingShutdownTimeoutMillis(10);
+ return serverConfig;
+ }
+
protected class ObjectFuture<T> {
volatile private T object;
private Semaphore semaphore;
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java
new file mode 100644
index 0000000..cdf1d18
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
+import org.apache.rocketmq.remoting.internal.JvmUtils;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class EpollRemoteConnectionTest extends BaseTest {
+ private static RemotingServer remotingServer;
+ private static RemotingClient remotingClient;
+
+ private static RemotingServer remotingEpollServer;
+ private static RemotingClient remotingEpollClient;
+
+ private static short requestCode = 123;
+ private RemotingCommand request;
+
+ private static String remoteAddr;
+ private static String remoteEpollAddr;
+
+
+ @Before
+ public void enableOnLinux() throws Exception {
+ Assume.assumeTrue(JvmUtils.isLinux());
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ RemotingClientConfig clientConfig = new RemotingClientConfig();
+ clientConfig.setRemotingShutdownQuietPeriodMillis(0);
+ clientConfig.setRemotingShutdownTimeoutMillis(10);
+
+ RemotingServerConfig serverConfig = new RemotingServerConfig();
+ serverConfig.setRemotingShutdownQuietPeriodMillis(0);
+ serverConfig.setRemotingShutdownTimeoutMillis(10);
+
+ RemotingClientConfig epollClientConfig = new RemotingClientConfig();
+ epollClientConfig.setClientNativeEpollEnable(true);
+ epollClientConfig.setRemotingShutdownQuietPeriodMillis(0);
+ epollClientConfig.setRemotingShutdownTimeoutMillis(10);
+
+ RemotingServerConfig epollServerConfig = new RemotingServerConfig();
+ epollServerConfig.setServerNativeEpollEnable(true);
+ epollServerConfig.setServerListenPort(9999);
+ epollServerConfig.setRemotingShutdownQuietPeriodMillis(0);
+ epollServerConfig.setRemotingShutdownTimeoutMillis(10);
+
+ remotingClient = new NettyRemotingClient(clientConfig);
+ remotingServer = new NettyRemotingServer(serverConfig);
+
+ remotingEpollServer = new NettyRemotingServer(epollServerConfig);
+ remotingEpollClient = new NettyRemotingClient(epollClientConfig);
+
+ remotingServer.registerRequestProcessor(requestCode, new RequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+ RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+ response.payload("Pong".getBytes());
+ return response;
+ }
+ });
+
+ remotingEpollServer.registerRequestProcessor(requestCode, new RequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+ RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+ response.payload("Pong".getBytes());
+ return response;
+ }
+ });
+
+ remotingServer.start();
+ remotingClient.start();
+ remotingEpollClient.start();
+ remotingEpollServer.start();
+
+ remoteAddr = "127.0.0.1:" + remotingServer.localListenPort();
+ remoteEpollAddr = "127.0.0.1:" + remotingEpollServer.localListenPort();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ remotingClient.stop();
+ remotingServer.stop();
+ remotingEpollServer.stop();
+ remotingEpollClient.stop();
+ }
+
+ public RemotingCommand requestCommand() {
+ request = remotingClient.commandFactory().createRequest();
+ request.cmdCode(requestCode);
+ return request;
+ }
+
+ @Test
+ public void invokeToServer_Success() {
+ // Client to epoll server
+ RemotingCommand rsp = remotingClient.invoke(remoteEpollAddr, requestCommand(), 3000);
+ assertThat(new String(rsp.payload())).isEqualTo("Pong");
+
+ // Epoll client to server
+ rsp = remotingEpollClient.invoke(remoteAddr, requestCommand(), 3000);
+ assertThat(new String(rsp.payload())).isEqualTo("Pong");
+
+ // Epoll client to epoll server
+ rsp = remotingEpollClient.invoke(remoteEpollAddr, requestCommand(), 3000);
+ assertThat(new String(rsp.payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void invokeAsyncToServer_Success() {
+ // Client to epoll server
+
+ final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000);
+
+ remotingClient.invokeAsync(remoteEpollAddr, requestCommand(), new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+ objectFuture.putObject(response);
+ objectFuture.release();
+ }
+ }, 3000);
+
+ assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+
+ // Epoll client to server
+ remotingEpollClient.invokeAsync(remoteAddr, requestCommand(), new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+ objectFuture.putObject(response);
+ objectFuture.release();
+ }
+ }, 3000);
+
+ assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+
+ // Epoll client to epoll server
+ remotingEpollClient.invokeAsync(remoteEpollAddr, requestCommand(), new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+ objectFuture.putObject(response);
+ objectFuture.release();
+ }
+ }, 3000);
+
+ assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void invokeOnewayToServer_Success() {
+ final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000);
+ final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000);
+
+ Interceptor interceptor = new Interceptor() {
+ @Override
+ public void beforeRequest(final RequestContext context) {
+ requestFuture.putObject(context.getRequest());
+ requestFuture.release();
+ }
+
+ @Override
+ public void afterResponseReceived(final ResponseContext context) {
+ responseFuture.putObject(context.getResponse());
+ responseFuture.release();
+ }
+ };
+
+ remotingServer.registerInterceptor(interceptor);
+ remotingEpollServer.registerInterceptor(interceptor);
+
+ // Client to epoll server
+ remotingClient.invokeOneWay(remoteEpollAddr, request);
+
+ assertThat(requestFuture.getObject()).isEqualTo(request);
+ assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+
+ // Epoll client to server
+ remotingEpollClient.invokeOneWay(remoteAddr, request);
+
+ assertThat(requestFuture.getObject()).isEqualTo(request);
+ assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+
+ // Epoll client to epoll server
+ remotingEpollClient.invokeOneWay(remoteEpollAddr, request);
+
+ assertThat(requestFuture.getObject()).isEqualTo(request);
+ assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+ }
+}
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java
new file mode 100644
index 0000000..d12a5a3
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.RemotingServer;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class NettyRemoteConnectionTest extends BaseTest {
+ private static RemotingServer remotingServer;
+ private static RemotingClient remotingClient;
+
+ private static short requestCode = 123;
+ private RemotingCommand request;
+
+ private static String remoteAddr;
+ private static RemotingChannel channelInServer;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ RemotingClientConfig clientConfig = new RemotingClientConfig();
+ clientConfig.setRemotingShutdownQuietPeriodMillis(0);
+ clientConfig.setRemotingShutdownTimeoutMillis(10);
+
+ RemotingServerConfig serverConfig = new RemotingServerConfig();
+ serverConfig.setRemotingShutdownQuietPeriodMillis(0);
+ serverConfig.setRemotingShutdownTimeoutMillis(10);
+
+ remotingClient = new NettyRemotingClient(clientConfig);
+ remotingServer = new NettyRemotingServer(serverConfig);
+
+ remotingServer.registerRequestProcessor(requestCode, new RequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+ RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+ response.payload("Pong".getBytes());
+ return response;
+ }
+ });
+
+ remotingClient.registerRequestProcessor(requestCode, new RequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) {
+ RemotingCommand response = remotingServer.commandFactory().createResponse(request);
+ response.payload("ClientPong".getBytes());
+ return response;
+ }
+ });
+
+ remotingServer.registerChannelEventListener(new ChannelEventListener() {
+ @Override
+ public void onChannelConnect(final RemotingChannel channel) {
+ channelInServer = channel;
+ }
+
+ @Override
+ public void onChannelClose(final RemotingChannel channel) {
+ channelInServer = null;
+ }
+
+ @Override
+ public void onChannelException(final RemotingChannel channel, final Throwable cause) {
+ channelInServer = null;
+ }
+
+ @Override
+ public void onChannelIdle(final RemotingChannel channel) {
+ channelInServer = null;
+ }
+ });
+
+ remotingServer.start();
+ remotingClient.start();
+
+ remoteAddr = "127.0.0.1:" + remotingServer.localListenPort();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ remotingClient.stop();
+ remotingServer.stop();
+ }
+
+ @Before
+ public void setUp0() throws Exception {
+ request = remotingClient.commandFactory().createRequest();
+ request.cmdCode(requestCode);
+
+ if (channelInServer == null) {
+ RemotingCommand rsp = remotingClient.invoke(remoteAddr, request, 3000);
+ assertThat(new String(rsp.payload())).isEqualTo("Pong");
+
+ // Refresh the command
+ request = remotingClient.commandFactory().createRequest();
+ request.cmdCode(requestCode);
+ }
+ }
+
+ @Test
+ public void invokeToServer_Success() {
+ RemotingCommand rsp = remotingClient.invoke(remoteAddr, request, 3000);
+ assertThat(new String(rsp.payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void invokeAsyncToServer_Success() {
+ final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000);
+
+ remotingClient.invokeAsync(remoteAddr, request, new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+ objectFuture.putObject(response);
+ objectFuture.release();
+ }
+ }, 3000);
+
+ assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void invokeOnewayToServer_Success() {
+ final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000);
+ final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000);
+
+ remotingServer.registerInterceptor(new Interceptor() {
+ @Override
+ public void beforeRequest(final RequestContext context) {
+ requestFuture.putObject(context.getRequest());
+ requestFuture.release();
+ }
+
+ @Override
+ public void afterResponseReceived(final ResponseContext context) {
+ responseFuture.putObject(context.getResponse());
+ responseFuture.release();
+ }
+ });
+
+ remotingClient.invokeOneWay(remoteAddr, request);
+
+ assertThat(requestFuture.getObject()).isEqualTo(request);
+ assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong");
+ }
+
+ @Test
+ public void invokeToClient_Success() {
+ RemotingCommand rsp = remotingServer.invoke(channelInServer, request, 3000);
+ assertThat(new String(rsp.payload())).isEqualTo("ClientPong");
+ }
+
+ @Test
+ public void invokeAsyncToClient_Success() {
+ final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000);
+
+ remotingServer.invokeAsync(channelInServer, request, new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+ objectFuture.putObject(response);
+ objectFuture.release();
+ }
+ }, 3000);
+
+ assertThat(new String(objectFuture.getObject().payload())).isEqualTo("ClientPong");
+ }
+
+ @Test
+ public void invokeOnewayToClient_Success() {
+ final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000);
+ final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000);
+
+ remotingClient.registerInterceptor(new Interceptor() {
+ @Override
+ public void beforeRequest(final RequestContext context) {
+ requestFuture.putObject(context.getRequest());
+ requestFuture.release();
+ }
+
+ @Override
+ public void afterResponseReceived(final ResponseContext context) {
+ responseFuture.putObject(context.getResponse());
+ responseFuture.release();
+ }
+ });
+
+ remotingServer.invokeOneWay(channelInServer, request);
+
+ assertThat(requestFuture.getObject()).isEqualTo(request);
+ assertThat(new String(responseFuture.getObject().payload())).isEqualTo("ClientPong");
+ }
+}
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java
new file mode 100644
index 0000000..aee21fb
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NettyRemotingClientTest extends BaseTest {
+ @Spy
+ private NettyRemotingClient remotingClient = new NettyRemotingClient(clientConfig());
+
+ @Mock
+ private ClientChannelManager channelManager;
+
+ @Mock
+ private Channel mockedChannel;
+
+ @Before
+ public void setUp() {
+ remotingClient.start();
+ remotingClient.setClientChannelManager(channelManager);
+ when(channelManager.createIfAbsent(any(String.class))).thenReturn(mockedChannel);
+ when(mockedChannel.isActive()).thenReturn(true);
+ }
+
+ @After
+ public void tearDown() {
+ remotingClient.stop();
+ }
+
+ @Test
+ public void invoke_Success() {
+ RemotingCommand request = remotingClient.commandFactory().createRequest();
+ final RemotingCommand response = remotingClient.commandFactory().createResponse(request);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ return response;
+ }
+ }).when(remotingClient).invokeWithInterceptor(mockedChannel, request, 3000);
+
+ RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000);
+
+ assertThat(response).isEqualTo(returnedResp);
+ }
+
+ @Test
+ public void invoke_ConnectFailureException() {
+ RemotingCommand request = remotingClient.commandFactory().createRequest();
+ when(mockedChannel.isActive()).thenReturn(false);
+
+ try {
+ RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000);
+ failBecauseExceptionWasNotThrown(RemoteConnectFailureException.class);
+ } catch (Exception e) {
+ assertThat(e).isInstanceOf(RemoteConnectFailureException.class);
+ }
+ }
+
+ @Test
+ public void invoke_TimeoutException() {
+ RemotingCommand request = remotingClient.commandFactory().createRequest();
+
+ doThrow(new RemoteTimeoutException("Timeout exception occurred")).when(remotingClient).invokeWithInterceptor(mockedChannel, request, 3000);
+
+ try {
+ RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000);
+ failBecauseExceptionWasNotThrown(RemoteTimeoutException.class);
+ } catch (Exception e) {
+ assertThat(e).isInstanceOf(RemoteTimeoutException.class);
+ }
+ }
+
+ @Test
+ public void invokeAsync_Success() {
+ RemotingCommand request = remotingClient.commandFactory().createRequest();
+ final RemotingCommand response = remotingClient.commandFactory().createResponse(request);
+
+ final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 100);
+
+ final AsyncHandler asyncHandler = new AsyncHandler() {
+ @Override
+ public void onFailure(final RemotingCommand request, final Throwable cause) {
+
+ }
+
+ @Override
+ public void onSuccess(final RemotingCommand response) {
+ objectFuture.putObject(response);
+ objectFuture.release();
+ }
+ };
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable {
+ asyncHandler.onSuccess(response);
+ return null;
+ }
+ }).when(remotingClient).invokeAsyncWithInterceptor(mockedChannel, request, asyncHandler,3000);
+
+ remotingClient.invokeAsync("127.0.0.1:10911", request, asyncHandler, 3000);
+
+ assertThat(objectFuture.getObject()).isEqualTo(response);
+
+ }
+}
\ No newline at end of file