Fix some bugs and polish the netty transport implementation
diff --git a/benchmarks/remoting-benchmark/pom.xml b/benchmarks/remoting-benchmark/pom.xml
index e103ae8..a18838d 100644
--- a/benchmarks/remoting-benchmark/pom.xml
+++ b/benchmarks/remoting-benchmark/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-x</artifactId>
diff --git a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
index b3754e9..04ca3cf 100644
--- a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
+++ b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
@@ -21,13 +21,39 @@
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.RemotingServer;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AbstractBenchmark {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractBenchmark.class);
+ public static void main(String[] args) throws InterruptedException {
+ RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingServerConfig());
+
+ server.registerRequestProcessor((short) 1, (channel, request) -> {
+ RemotingCommand response = server.commandFactory().createResponse(request);
+ response.payload("zhouxinyu".getBytes());
+ System.out.println(new String(request.payload()));
+ return response;
+ });
+ server.start();
+
+ RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingClientConfig());
+ client.start();
+
+ RemotingCommand request = client.commandFactory().createRequest();
+ request.cmdCode((short) 1);
+ request.cmdVersion((short) 1);
+ request.payload("hello".getBytes());
+ RemotingCommand response = client.invoke("127.0.0.1:8888", request, 3000);
+ System.out.println(new String(response.payload()));
+
+ client.stop();
+ server.stop();
+ }
+
/**
* Standard message sizes.
*/
@@ -35,6 +61,7 @@
SMALL(16), MEDIUM(1024), LARGE(65536), JUMBO(1048576);
private final int bytes;
+
MessageSize(int bytes) {
this.bytes = bytes;
}
@@ -50,29 +77,4 @@
public enum ChannelType {
NIO, LOCAL;
}
-
- public static void main(String[] args) throws InterruptedException {
- RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingConfig());
-
- server.registerRequestProcessor((short) 1, (channel, request) -> {
- RemotingCommand response = server.commandFactory().createResponse(request);
- response.payload("zhouxinyu".getBytes());
- System.out.println(new String(request.payload()));
- return response;
- });
- server.start();
-
- RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingConfig());
- client.start();
-
- RemotingCommand request = client.commandFactory().createRequest();
- request.cmdCode((short) 1);
- request.cmdVersion((short) 1);
- request.payload("hello".getBytes());
- RemotingCommand response = client.invoke("127.0.0.1:8888", request, 3000);
- System.out.println(new String(response.payload()));
-
- client.stop();
- server.stop();
- }
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
index efa4078..84ae102 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
@@ -19,7 +19,8 @@
import java.util.Properties;
import org.apache.rocketmq.remoting.api.RemotingClient;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
import org.apache.rocketmq.remoting.impl.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.impl.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.internal.BeanUtils;
@@ -30,33 +31,33 @@
* Remoting Bootstrap entrance.
*/
public final class RemotingBootstrapFactory {
- public static RemotingClient createRemotingClient(@NotNull final String fileName) {
- Properties prop = PropertyUtils.loadProps(fileName);
- RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+ public static RemotingClient createRemotingClient(@NotNull final RemotingClientConfig config) {
return new NettyRemotingClient(config);
}
- public static RemotingClient createRemotingClient(@NotNull final RemotingConfig config) {
+ public static RemotingClient createRemotingClient(@NotNull final String fileName) {
+ Properties prop = PropertyUtils.loadProps(fileName);
+ RemotingClientConfig config = BeanUtils.populate(prop, RemotingClientConfig.class);
return new NettyRemotingClient(config);
}
public static RemotingClient createRemotingClient(@NotNull final Properties properties) {
- RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+ RemotingClientConfig config = BeanUtils.populate(properties, RemotingClientConfig.class);
return new NettyRemotingClient(config);
}
public static NettyRemotingServer createRemotingServer(@NotNull final String fileName) {
Properties prop = PropertyUtils.loadProps(fileName);
- RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+ RemotingServerConfig config = BeanUtils.populate(prop, RemotingServerConfig.class);
return new NettyRemotingServer(config);
}
public static NettyRemotingServer createRemotingServer(@NotNull final Properties properties) {
- RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+ RemotingServerConfig config = BeanUtils.populate(properties, RemotingServerConfig.class);
return new NettyRemotingServer(config);
}
- public static NettyRemotingServer createRemotingServer(@NotNull final RemotingConfig config) {
+ public static NettyRemotingServer createRemotingServer(@NotNull final RemotingServerConfig config) {
return new NettyRemotingServer(config);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index 8a2aec2..e6c394b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -24,7 +24,6 @@
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
import org.jetbrains.annotations.Nullable;
public class ResponseFuture {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java
new file mode 100644
index 0000000..8d77388
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+public class RemotingClientConfig extends RemotingConfig {
+ private int connectTimeoutMillis = 3000;
+
+ private boolean clientNativeEpollEnable = false;
+
+ private int clientIoThreads = 1;
+ private int clientWorkerThreads = 4;
+
+ private int clientOnewayInvokeSemaphore = 65535;
+ private int clientAsyncInvokeSemaphore = 65535;
+
+ private boolean clientPooledBytebufAllocatorEnable = false;
+
+ private boolean clientCloseSocketIfTimeout = false;
+ private boolean clientShortConnectionEnable = false;
+
+ public boolean isClientNativeEpollEnable() {
+ return clientNativeEpollEnable;
+ }
+
+ public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) {
+ this.clientNativeEpollEnable = clientNativeEpollEnable;
+ }
+
+ public int getClientIoThreads() {
+ return clientIoThreads;
+ }
+
+ public void setClientIoThreads(final int clientIoThreads) {
+ this.clientIoThreads = clientIoThreads;
+ }
+
+ public int getClientWorkerThreads() {
+ return clientWorkerThreads;
+ }
+
+ public void setClientWorkerThreads(final int clientWorkerThreads) {
+ this.clientWorkerThreads = clientWorkerThreads;
+ }
+
+ public int getClientOnewayInvokeSemaphore() {
+ return clientOnewayInvokeSemaphore;
+ }
+
+ public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) {
+ this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore;
+ }
+
+ public int getClientAsyncInvokeSemaphore() {
+ return clientAsyncInvokeSemaphore;
+ }
+
+ public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) {
+ this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore;
+ }
+
+ public boolean isClientPooledBytebufAllocatorEnable() {
+ return clientPooledBytebufAllocatorEnable;
+ }
+
+ public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) {
+ this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable;
+ }
+
+ public boolean isClientCloseSocketIfTimeout() {
+ return clientCloseSocketIfTimeout;
+ }
+
+ public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
+ this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
+ }
+
+ public boolean isClientShortConnectionEnable() {
+ return clientShortConnectionEnable;
+ }
+
+ public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) {
+ this.clientShortConnectionEnable = clientShortConnectionEnable;
+ }
+
+ public int getConnectTimeoutMillis() {
+ return connectTimeoutMillis;
+ }
+
+ public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
+ this.connectTimeoutMillis = connectTimeoutMillis;
+ }
+
+ @Override
+ public int getOnewayInvokeSemaphore() {
+ return this.clientOnewayInvokeSemaphore;
+ }
+
+ @Override
+ public int getAsyncInvokeSemaphore() {
+ return this.clientAsyncInvokeSemaphore;
+ }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index 9fa79c2..d6f636b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -17,67 +17,26 @@
package org.apache.rocketmq.remoting.config;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-
-public class RemotingConfig extends TcpSocketConfig {
- private int connectionMaxRetries = 3;
- private int connectionChannelReaderIdleSeconds = 0;
- private int connectionChannelWriterIdleSeconds = 0;
+public abstract class RemotingConfig extends TcpSocketConfig {
/**
* IdleStateEvent will be triggered when neither read nor write was
* performed for the specified period of this time. Specify {@code 0} to
* disable
*/
+ private int connectionChannelReaderIdleSeconds = 0;
+ private int connectionChannelWriterIdleSeconds = 0;
private int connectionChannelIdleSeconds = 120;
+
private int writeBufLowWaterMark = 32 * 10240;
private int writeBufHighWaterMark = 64 * 10240;
- private int threadTaskLowWaterMark = 30000;
- private int threadTaskHighWaterMark = 50000;
- private int connectionRetryBackoffMillis = 3000;
- private int serviceThreadBlockQueueSize = 50000;
- private boolean clientNativeEpollEnable = false;
- private int clientWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
- private int clientConnectionFutureAwaitTimeoutMillis = 3000;
- private int clientAsyncCallbackExecutorThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
- private int clientOnewayInvokeSemaphore = 20480;
- //=============Server configuration==================
- private int clientAsyncInvokeSemaphore = 20480;
- private boolean clientPooledBytebufAllocatorEnable = false;
- private boolean clientCloseSocketIfTimeout = true;
- private boolean clientShortConnectionEnable = false;
- private long clientPublishServiceTimeout = 10000;
- private long clientConsumerServiceTimeout = 10000;
- private long clientInvokeServiceTimeout = 10000;
- private int clientMaxRetryCount = 10;
- private int clientSleepBeforeRetry = 100;
- private int serverListenPort = 8888;
- /**
- * If server only listened 1 port,recommend to set the value to 1
- */
- private int serverAcceptorThreads = 1;
- private int serverIoThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
- private int serverWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
- private int serverOnewayInvokeSemaphore = 256;
- private int serverAsyncInvokeSemaphore = 6400;
- private boolean serverNativeEpollEnable = false;
- private int serverAsyncCallbackExecutorThreads = Runtime.getRuntime().availableProcessors() * 2;
- private boolean serverPooledBytebufAllocatorEnable = true;
- private boolean serverAuthOpenEnable = true;
+ private int asyncHandlerExecutorThreads = Runtime.getRuntime().availableProcessors();
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
- }
+ private int publicExecutorThreads = 4;
- public int getConnectionMaxRetries() {
- return connectionMaxRetries;
- }
+ public abstract int getOnewayInvokeSemaphore();
- public void setConnectionMaxRetries(final int connectionMaxRetries) {
- this.connectionMaxRetries = connectionMaxRetries;
- }
+ public abstract int getAsyncInvokeSemaphore();
public int getConnectionChannelReaderIdleSeconds() {
return connectionChannelReaderIdleSeconds;
@@ -119,227 +78,19 @@
this.writeBufHighWaterMark = writeBufHighWaterMark;
}
- public int getThreadTaskLowWaterMark() {
- return threadTaskLowWaterMark;
+ public int getAsyncHandlerExecutorThreads() {
+ return asyncHandlerExecutorThreads;
}
- public void setThreadTaskLowWaterMark(final int threadTaskLowWaterMark) {
- this.threadTaskLowWaterMark = threadTaskLowWaterMark;
+ public void setAsyncHandlerExecutorThreads(final int asyncHandlerExecutorThreads) {
+ this.asyncHandlerExecutorThreads = asyncHandlerExecutorThreads;
}
- public int getThreadTaskHighWaterMark() {
- return threadTaskHighWaterMark;
+ public int getPublicExecutorThreads() {
+ return publicExecutorThreads;
}
- public void setThreadTaskHighWaterMark(final int threadTaskHighWaterMark) {
- this.threadTaskHighWaterMark = threadTaskHighWaterMark;
- }
-
- public int getConnectionRetryBackoffMillis() {
- return connectionRetryBackoffMillis;
- }
-
- public void setConnectionRetryBackoffMillis(final int connectionRetryBackoffMillis) {
- this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
- }
-
- public int getServiceThreadBlockQueueSize() {
- return serviceThreadBlockQueueSize;
- }
-
- public void setServiceThreadBlockQueueSize(final int serviceThreadBlockQueueSize) {
- this.serviceThreadBlockQueueSize = serviceThreadBlockQueueSize;
- }
-
- public boolean isClientNativeEpollEnable() {
- return clientNativeEpollEnable;
- }
-
- public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) {
- this.clientNativeEpollEnable = clientNativeEpollEnable;
- }
-
- public int getClientWorkerThreads() {
- return clientWorkerThreads;
- }
-
- public void setClientWorkerThreads(final int clientWorkerThreads) {
- this.clientWorkerThreads = clientWorkerThreads;
- }
-
- public int getClientConnectionFutureAwaitTimeoutMillis() {
- return clientConnectionFutureAwaitTimeoutMillis;
- }
-
- public void setClientConnectionFutureAwaitTimeoutMillis(final int clientConnectionFutureAwaitTimeoutMillis) {
- this.clientConnectionFutureAwaitTimeoutMillis = clientConnectionFutureAwaitTimeoutMillis;
- }
-
- public int getClientAsyncCallbackExecutorThreads() {
- return clientAsyncCallbackExecutorThreads;
- }
-
- public void setClientAsyncCallbackExecutorThreads(final int clientAsyncCallbackExecutorThreads) {
- this.clientAsyncCallbackExecutorThreads = clientAsyncCallbackExecutorThreads;
- }
-
- public int getClientOnewayInvokeSemaphore() {
- return clientOnewayInvokeSemaphore;
- }
-
- public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) {
- this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore;
- }
-
- public int getClientAsyncInvokeSemaphore() {
- return clientAsyncInvokeSemaphore;
- }
-
- public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) {
- this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore;
- }
-
- public boolean isClientPooledBytebufAllocatorEnable() {
- return clientPooledBytebufAllocatorEnable;
- }
-
- public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) {
- this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable;
- }
-
- public boolean isClientCloseSocketIfTimeout() {
- return clientCloseSocketIfTimeout;
- }
-
- public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
- this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
- }
-
- public boolean isClientShortConnectionEnable() {
- return clientShortConnectionEnable;
- }
-
- public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) {
- this.clientShortConnectionEnable = clientShortConnectionEnable;
- }
-
- public long getClientPublishServiceTimeout() {
- return clientPublishServiceTimeout;
- }
-
- public void setClientPublishServiceTimeout(final long clientPublishServiceTimeout) {
- this.clientPublishServiceTimeout = clientPublishServiceTimeout;
- }
-
- public long getClientConsumerServiceTimeout() {
- return clientConsumerServiceTimeout;
- }
-
- public void setClientConsumerServiceTimeout(final long clientConsumerServiceTimeout) {
- this.clientConsumerServiceTimeout = clientConsumerServiceTimeout;
- }
-
- public long getClientInvokeServiceTimeout() {
- return clientInvokeServiceTimeout;
- }
-
- public void setClientInvokeServiceTimeout(final long clientInvokeServiceTimeout) {
- this.clientInvokeServiceTimeout = clientInvokeServiceTimeout;
- }
-
- public int getClientMaxRetryCount() {
- return clientMaxRetryCount;
- }
-
- public void setClientMaxRetryCount(final int clientMaxRetryCount) {
- this.clientMaxRetryCount = clientMaxRetryCount;
- }
-
- public int getClientSleepBeforeRetry() {
- return clientSleepBeforeRetry;
- }
-
- public void setClientSleepBeforeRetry(final int clientSleepBeforeRetry) {
- this.clientSleepBeforeRetry = clientSleepBeforeRetry;
- }
-
- public int getServerListenPort() {
- return serverListenPort;
- }
-
- public void setServerListenPort(final int serverListenPort) {
- this.serverListenPort = serverListenPort;
- }
-
- public int getServerAcceptorThreads() {
- return serverAcceptorThreads;
- }
-
- public void setServerAcceptorThreads(final int serverAcceptorThreads) {
- this.serverAcceptorThreads = serverAcceptorThreads;
- }
-
- public int getServerIoThreads() {
- return serverIoThreads;
- }
-
- public void setServerIoThreads(final int serverIoThreads) {
- this.serverIoThreads = serverIoThreads;
- }
-
- public int getServerWorkerThreads() {
- return serverWorkerThreads;
- }
-
- public void setServerWorkerThreads(final int serverWorkerThreads) {
- this.serverWorkerThreads = serverWorkerThreads;
- }
-
- public int getServerOnewayInvokeSemaphore() {
- return serverOnewayInvokeSemaphore;
- }
-
- public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) {
- this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore;
- }
-
- public int getServerAsyncInvokeSemaphore() {
- return serverAsyncInvokeSemaphore;
- }
-
- public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) {
- this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore;
- }
-
- public boolean isServerNativeEpollEnable() {
- return serverNativeEpollEnable;
- }
-
- public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) {
- this.serverNativeEpollEnable = serverNativeEpollEnable;
- }
-
- public int getServerAsyncCallbackExecutorThreads() {
- return serverAsyncCallbackExecutorThreads;
- }
-
- public void setServerAsyncCallbackExecutorThreads(final int serverAsyncCallbackExecutorThreads) {
- this.serverAsyncCallbackExecutorThreads = serverAsyncCallbackExecutorThreads;
- }
-
- public boolean isServerPooledBytebufAllocatorEnable() {
- return serverPooledBytebufAllocatorEnable;
- }
-
- public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) {
- this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable;
- }
-
- public boolean isServerAuthOpenEnable() {
- return serverAuthOpenEnable;
- }
-
- public void setServerAuthOpenEnable(final boolean serverAuthOpenEnable) {
- this.serverAuthOpenEnable = serverAuthOpenEnable;
+ public void setPublicExecutorThreads(final int publicExecutorThreads) {
+ this.publicExecutorThreads = publicExecutorThreads;
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java
new file mode 100644
index 0000000..9879364
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+public class RemotingServerConfig extends RemotingConfig {
+ private int serverListenPort = 8888;
+ /**
+ * If server only listened 1 port,recommend to set the value to 1
+ */
+ private int serverAcceptorThreads = 1;
+ private int serverIoThreads = 3;
+ private int serverWorkerThreads = 8;
+
+ private int serverOnewayInvokeSemaphore = 256;
+ private int serverAsyncInvokeSemaphore = 64;
+
+ private boolean serverNativeEpollEnable = false;
+ private boolean serverPooledBytebufAllocatorEnable = true;
+
+ public int getServerListenPort() {
+ return serverListenPort;
+ }
+
+ public void setServerListenPort(final int serverListenPort) {
+ this.serverListenPort = serverListenPort;
+ }
+
+ public int getServerAcceptorThreads() {
+ return serverAcceptorThreads;
+ }
+
+ public void setServerAcceptorThreads(final int serverAcceptorThreads) {
+ this.serverAcceptorThreads = serverAcceptorThreads;
+ }
+
+ public int getServerIoThreads() {
+ return serverIoThreads;
+ }
+
+ public void setServerIoThreads(final int serverIoThreads) {
+ this.serverIoThreads = serverIoThreads;
+ }
+
+ public int getServerWorkerThreads() {
+ return serverWorkerThreads;
+ }
+
+ public void setServerWorkerThreads(final int serverWorkerThreads) {
+ this.serverWorkerThreads = serverWorkerThreads;
+ }
+
+ public int getServerOnewayInvokeSemaphore() {
+ return serverOnewayInvokeSemaphore;
+ }
+
+ public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) {
+ this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore;
+ }
+
+ public int getServerAsyncInvokeSemaphore() {
+ return serverAsyncInvokeSemaphore;
+ }
+
+ public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) {
+ this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore;
+ }
+
+ public boolean isServerNativeEpollEnable() {
+ return serverNativeEpollEnable;
+ }
+
+ public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) {
+ this.serverNativeEpollEnable = serverNativeEpollEnable;
+ }
+
+ public boolean isServerPooledBytebufAllocatorEnable() {
+ return serverPooledBytebufAllocatorEnable;
+ }
+
+ public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) {
+ this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable;
+ }
+
+ @Override
+ public int getOnewayInvokeSemaphore() {
+ return this.serverOnewayInvokeSemaphore;
+ }
+
+ @Override
+ public int getAsyncInvokeSemaphore() {
+ return this.serverAsyncInvokeSemaphore;
+ }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
index 4dfcde7..d77bf3d 100755
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
@@ -23,14 +23,14 @@
* @see java.net.SocketOptions
*/
public class TcpSocketConfig {
- private boolean tcpSoReuseAddress;
- private boolean tcpSoKeepAlive;
- private boolean tcpSoNoDelay;
- private int tcpSoSndBufSize; // see /proc/sys/net/ipv4/tcp_rmem
- private int tcpSoRcvBufSize; // see /proc/sys/net/ipv4/tcp_wmem
- private int tcpSoBacklogSize;
- private int tcpSoLinger;
- private int tcpSoTimeout;
+ private boolean tcpSoReuseAddress = true;
+ private boolean tcpSoKeepAlive = false;
+ private boolean tcpSoNoDelay = true;
+ private int tcpSoSndBufSize = 65535; // see /proc/sys/net/ipv4/tcp_rmem
+ private int tcpSoRcvBufSize = 65535; // see /proc/sys/net/ipv4/tcp_wmem
+ private int tcpSoBacklogSize = 1024;
+ private int tcpSoLinger = -1;
+ private int tcpSoTimeoutMillis = 3000;
public boolean isTcpSoReuseAddress() {
return tcpSoReuseAddress;
@@ -88,11 +88,11 @@
this.tcpSoLinger = tcpSoLinger;
}
- public int getTcpSoTimeout() {
- return tcpSoTimeout;
+ public int getTcpSoTimeoutMillis() {
+ return tcpSoTimeoutMillis;
}
- public void setTcpSoTimeout(final int tcpSoTimeout) {
- this.tcpSoTimeout = tcpSoTimeout;
+ public void setTcpSoTimeoutMillis(final int tcpSoTimeoutMillis) {
+ this.tcpSoTimeoutMillis = tcpSoTimeoutMillis;
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
index 8d3ff3a..988c20c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -29,12 +29,12 @@
// + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4);
public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4;
public final static byte PROTOCOL_MAGIC = 0x14;
- private final static char PROPERTY_SEPARATOR = '\n';
- private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
final static int REMARK_MAX_LEN = Short.MAX_VALUE;
final static int PROPERTY_MAX_LEN = 524288; // 512KB
final static int PAYLOAD_MAX_LEN = 16777216; // 16MB
public final static int PACKET_MAX_LEN = MIN_PROTOCOL_LEN + REMARK_MAX_LEN + PROPERTY_MAX_LEN + PAYLOAD_MAX_LEN;
+ private final static char PROPERTY_SEPARATOR = '\n';
+ private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
public static void encodeCommand(final RemotingCommand command, final ByteBufferWrapper out) {
out.writeByte(PROTOCOL_MAGIC);
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
index 0b084a0..4f4ec5c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
@@ -28,7 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +41,10 @@
final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
private final Lock lockChannelTables = new ReentrantLock();
private final Bootstrap clientBootstrap;
- private final RemotingConfig clientConfig;
+ private final RemotingClientConfig clientConfig;
ClientChannelManager(final Bootstrap bootstrap,
- final RemotingConfig config) {
+ final RemotingClientConfig config) {
clientBootstrap = bootstrap;
clientConfig = config;
}
@@ -106,7 +106,7 @@
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
- if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+ if (channelFuture.awaitUninterruptibly(this.clientConfig.getConnectTimeoutMillis())) {
if (cw.isActive()) {
LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
@@ -115,7 +115,7 @@
this.closeChannel(addr, cw.getChannel());
}
} else {
- LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+ LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
this.closeChannel(addr, cw.getChannel());
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
index 1bf2277..432363d 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.remoting.impl.netty;
public enum NettyChannelEventType {
- ACTIVE,
- INACTIVE,
+ CONNECT,
+ CLOSE,
IDLE,
EXCEPTION
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 920a922..cbd0059 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -74,15 +74,20 @@
private final String remotingInstanceId = UIDGenerator.instance().createUID();
private final ExecutorService publicExecutor;
+ private final ExecutorService asyncHandlerExecutor;
protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
private InterceptorGroup interceptorGroup = new InterceptorGroup();
private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
- NettyRemotingAbstract(RemotingConfig clientConfig) {
- this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
- this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
+ NettyRemotingAbstract(RemotingConfig remotingConfig) {
+ this.semaphoreOneway = new Semaphore(remotingConfig.getOnewayInvokeSemaphore(), true);
+ this.semaphoreAsync = new Semaphore(remotingConfig.getAsyncInvokeSemaphore(), true);
this.publicExecutor = ThreadUtils.newFixedThreadPool(
- clientConfig.getClientAsyncCallbackExecutorThreads(),
+ remotingConfig.getPublicExecutorThreads(),
+ 10000, "Remoting-PublicExecutor", true);
+
+ this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool(
+ remotingConfig.getAsyncHandlerExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
this.remotingCommandFactory = new RemotingCommandFactoryImpl();
}
@@ -133,7 +138,9 @@
@Override
public void stop() {
+ ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
+ ThreadUtils.shutdownGracefully(asyncHandlerExecutor, 2000, TimeUnit.MILLISECONDS);
ThreadUtils.shutdownGracefully(channelEventExecutor);
}
@@ -234,16 +241,12 @@
channel.writeAndFlush(msg);
}
- public ExecutorService getCallbackExecutor() {
- return this.publicExecutor;
- }
-
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
*/
private void executeAsyncHandler(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
- ExecutorService executor = this.getCallbackExecutor();
+ ExecutorService executor = asyncHandlerExecutor;
if (executor != null) {
try {
executor.submit(new Runnable() {
@@ -549,10 +552,10 @@
case IDLE:
listener.onChannelIdle(channel);
break;
- case INACTIVE:
+ case CLOSE:
listener.onChannelClose(channel);
break;
- case ACTIVE:
+ case CONNECT:
listener.onChannelConnect(channel);
break;
case EXCEPTION:
@@ -571,7 +574,7 @@
}
- protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
+ protected class RemotingCommandDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index ce30aa2..6b2796e 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -37,18 +37,16 @@
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.SocketAddress;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
-import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
import org.apache.rocketmq.remoting.internal.JvmUtils;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@@ -56,21 +54,21 @@
private final EventLoopGroup ioGroup;
private final Class<? extends SocketChannel> socketChannelClass;
- private final RemotingConfig clientConfig;
+ private final RemotingClientConfig clientConfig;
private EventExecutorGroup workerGroup;
private ClientChannelManager clientChannelManager;
- public NettyRemotingClient(final RemotingConfig clientConfig) {
+ public NettyRemotingClient(final RemotingClientConfig clientConfig) {
super(clientConfig);
this.clientConfig = clientConfig;
if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
- this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+ this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
clientConfig.getClientWorkerThreads()));
socketChannelClass = EpollSocketChannel.class;
} else {
- this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
+ this.ioGroup = new NioEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
clientConfig.getClientWorkerThreads()));
socketChannelClass = NioSocketChannel.class;
}
@@ -88,15 +86,14 @@
this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
- public void initChannel(SocketChannel ch) throws Exception {
+ public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(workerGroup,
new Decoder(),
new Encoder(),
new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
new ClientConnectionHandler(),
- new EventDispatcher(),
- new ExceptionHandler());
+ new RemotingCommandDispatcher());
}
});
@@ -108,14 +105,10 @@
@Override
public void stop() {
try {
- ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
-
clientChannelManager.clear();
this.ioGroup.shutdownGracefully();
- ThreadUtils.shutdownGracefully(channelEventExecutor);
-
this.workerGroup.shutdownGracefully();
} catch (Exception e) {
LOG.warn("RemotingClient stopped error !", e);
@@ -126,10 +119,6 @@
private void applyOptions(Bootstrap bootstrap) {
if (null != clientConfig) {
- if (clientConfig.getTcpSoLinger() > 0) {
- bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger());
- }
-
if (clientConfig.getTcpSoSndBufSize() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize());
}
@@ -137,10 +126,9 @@
bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize());
}
- bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()).
- option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()).
- option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()).
+ option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeoutMillis()).
option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(),
clientConfig.getWriteBufHighWaterMark()));
}
@@ -206,7 +194,7 @@
LOG.info("Connected from {} to {}.", localAddress, remoteAddress);
super.connect(ctx, remoteAddress, localAddress, promise);
- putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel()));
}
@Override
@@ -217,7 +205,7 @@
super.disconnect(ctx, promise);
- putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
}
@Override
@@ -228,11 +216,11 @@
super.close(ctx, promise);
- putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
}
@Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -246,7 +234,7 @@
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index f1e9360..f0dbb45 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -39,12 +39,11 @@
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingServer;
import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
@@ -52,7 +51,7 @@
import org.apache.rocketmq.remoting.internal.JvmUtils;
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
- private final RemotingConfig serverConfig;
+ private final RemotingServerConfig serverConfig;
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup bossGroup;
@@ -62,7 +61,7 @@
private int port;
- public NettyRemotingServer(final RemotingConfig serverConfig) {
+ public NettyRemotingServer(final RemotingServerConfig serverConfig) {
super(serverConfig);
this.serverBootstrap = new ServerBootstrap();
@@ -107,7 +106,7 @@
serverConfig.getConnectionChannelWriterIdleSeconds(),
serverConfig.getConnectionChannelIdleSeconds()),
new ServerConnectionHandler(),
- new EventDispatcher());
+ new RemotingCommandDispatcher());
}
});
@@ -122,10 +121,6 @@
@Override
public void stop() {
try {
- ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
-
- ThreadUtils.shutdownGracefully(channelEventExecutor);
-
this.bossGroup.shutdownGracefully().syncUninterruptibly();
this.ioGroup.shutdownGracefully().syncUninterruptibly();
@@ -160,11 +155,11 @@
bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()).
childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()).
childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()).
- option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout());
- }
+ option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeoutMillis());
- if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
- bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
+ bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ }
}
}
@@ -194,28 +189,32 @@
private class ServerConnectionHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ LOG.info("Channel {} registered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ LOG.info("Channel {} unregistered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ LOG.info("Channel {} became active, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
super.channelActive(ctx);
- putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel()));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ LOG.info("Channel {} became inactive, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
super.channelInactive(ctx);
- putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+ putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
}
@Override
- public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+ public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
final IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -233,9 +232,9 @@
}
@Override
- public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
-
ctx.channel().close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index aab6dfb..ed7c93a 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -43,7 +43,8 @@
ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
}
- protected void runInThreads(final Runnable runnable, int threadsNum, int timeoutMillis) throws InterruptedException {
+ protected void runInThreads(final Runnable runnable, int threadsNum,
+ int timeoutMillis) throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
runInThreads(new Runnable() {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
index b671662..56440e5 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
@@ -21,7 +21,7 @@
import org.apache.rocketmq.remoting.BaseTest;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
public class SemaphoreReleaseOnlyOnceTest extends BaseTest {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
index c1274c6..391c8bd 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
@@ -22,7 +22,10 @@
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class RemotingCommandFactoryImplTest {
private RemotingCommandFactory factory = new RemotingCommandFactoryImpl();
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
index 5686124..4620542 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
@@ -19,7 +19,7 @@
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
public class RequestIdGeneratorTest {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
index 16084d8..16618e8 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
@@ -26,7 +26,7 @@
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.BaseTest;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -60,7 +60,7 @@
when(channel.close()).thenReturn(channelPromise);
when(channel.remoteAddress()).thenReturn(new InetSocketAddress(8080));
- channelManager = new ClientChannelManager(clientBootstrap, new RemotingConfig());
+ channelManager = new ClientChannelManager(clientBootstrap, new RemotingClientConfig());
}
@Test
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
index 9a379e9..1629342 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
@@ -52,7 +52,6 @@
assertEquals(request, decodedRequest);
}
-
@Test
public void encode_LenOverLimit_ChannelClosed() {
EmbeddedChannel channel = new EmbeddedChannel(new Encoder());