Fix some bugs and polish the netty transport implementation
diff --git a/benchmarks/remoting-benchmark/pom.xml b/benchmarks/remoting-benchmark/pom.xml
index e103ae8..a18838d 100644
--- a/benchmarks/remoting-benchmark/pom.xml
+++ b/benchmarks/remoting-benchmark/pom.xml
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-x</artifactId>
diff --git a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
index b3754e9..04ca3cf 100644
--- a/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
+++ b/benchmarks/remoting-benchmark/src/main/java/org/apache/rocketmq/benchmarks/remoting/AbstractBenchmark.java
@@ -21,13 +21,39 @@
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.RemotingServer;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class AbstractBenchmark {
     protected static final Logger LOG = LoggerFactory.getLogger(AbstractBenchmark.class);
 
+    public static void main(String[] args) throws InterruptedException {
+        RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingServerConfig());
+
+        server.registerRequestProcessor((short) 1, (channel, request) -> {
+            RemotingCommand response = server.commandFactory().createResponse(request);
+            response.payload("zhouxinyu".getBytes());
+            System.out.println(new String(request.payload()));
+            return response;
+        });
+        server.start();
+
+        RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingClientConfig());
+        client.start();
+
+        RemotingCommand request = client.commandFactory().createRequest();
+        request.cmdCode((short) 1);
+        request.cmdVersion((short) 1);
+        request.payload("hello".getBytes());
+        RemotingCommand response = client.invoke("127.0.0.1:8888", request, 3000);
+        System.out.println(new String(response.payload()));
+
+        client.stop();
+        server.stop();
+    }
+
     /**
      * Standard message sizes.
      */
@@ -35,6 +61,7 @@
         SMALL(16), MEDIUM(1024), LARGE(65536), JUMBO(1048576);
 
         private final int bytes;
+
         MessageSize(int bytes) {
             this.bytes = bytes;
         }
@@ -50,29 +77,4 @@
     public enum ChannelType {
         NIO, LOCAL;
     }
-
-    public static void main(String[] args) throws InterruptedException {
-        RemotingServer server = RemotingBootstrapFactory.createRemotingServer(new RemotingConfig());
-
-        server.registerRequestProcessor((short) 1, (channel, request) -> {
-            RemotingCommand response = server.commandFactory().createResponse(request);
-            response.payload("zhouxinyu".getBytes());
-            System.out.println(new String(request.payload()));
-            return response;
-        });
-        server.start();
-
-        RemotingClient client = RemotingBootstrapFactory.createRemotingClient(new RemotingConfig());
-        client.start();
-
-        RemotingCommand request = client.commandFactory().createRequest();
-        request.cmdCode((short) 1);
-        request.cmdVersion((short) 1);
-        request.payload("hello".getBytes());
-        RemotingCommand response = client.invoke("127.0.0.1:8888", request, 3000);
-        System.out.println(new String(response.payload()));
-
-        client.stop();
-        server.stop();
-    }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
index efa4078..84ae102 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/RemotingBootstrapFactory.java
@@ -19,7 +19,8 @@
 
 import java.util.Properties;
 import org.apache.rocketmq.remoting.api.RemotingClient;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
 import org.apache.rocketmq.remoting.impl.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.impl.netty.NettyRemotingServer;
 import org.apache.rocketmq.remoting.internal.BeanUtils;
@@ -30,33 +31,33 @@
  * Remoting Bootstrap entrance.
  */
 public final class RemotingBootstrapFactory {
-    public static RemotingClient createRemotingClient(@NotNull final String fileName) {
-        Properties prop = PropertyUtils.loadProps(fileName);
-        RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+    public static RemotingClient createRemotingClient(@NotNull final RemotingClientConfig config) {
         return new NettyRemotingClient(config);
     }
 
-    public static RemotingClient createRemotingClient(@NotNull final RemotingConfig config) {
+    public static RemotingClient createRemotingClient(@NotNull final String fileName) {
+        Properties prop = PropertyUtils.loadProps(fileName);
+        RemotingClientConfig config = BeanUtils.populate(prop, RemotingClientConfig.class);
         return new NettyRemotingClient(config);
     }
 
     public static RemotingClient createRemotingClient(@NotNull final Properties properties) {
-        RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        RemotingClientConfig config = BeanUtils.populate(properties, RemotingClientConfig.class);
         return new NettyRemotingClient(config);
     }
 
     public static NettyRemotingServer createRemotingServer(@NotNull final String fileName) {
         Properties prop = PropertyUtils.loadProps(fileName);
-        RemotingConfig config = BeanUtils.populate(prop, RemotingConfig.class);
+        RemotingServerConfig config = BeanUtils.populate(prop, RemotingServerConfig.class);
         return new NettyRemotingServer(config);
     }
 
     public static NettyRemotingServer createRemotingServer(@NotNull final Properties properties) {
-        RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        RemotingServerConfig config = BeanUtils.populate(properties, RemotingServerConfig.class);
         return new NettyRemotingServer(config);
     }
 
-    public static NettyRemotingServer createRemotingServer(@NotNull final RemotingConfig config) {
+    public static NettyRemotingServer createRemotingServer(@NotNull final RemotingServerConfig config) {
         return new NettyRemotingServer(config);
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index 8a2aec2..e6c394b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -24,7 +24,6 @@
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
 import org.jetbrains.annotations.Nullable;
 
 public class ResponseFuture {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java
new file mode 100644
index 0000000..8d77388
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingClientConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+public class RemotingClientConfig extends RemotingConfig {
+    private int connectTimeoutMillis = 3000;
+
+    private boolean clientNativeEpollEnable = false;
+
+    private int clientIoThreads = 1;
+    private int clientWorkerThreads = 4;
+
+    private int clientOnewayInvokeSemaphore = 65535;
+    private int clientAsyncInvokeSemaphore = 65535;
+
+    private boolean clientPooledBytebufAllocatorEnable = false;
+
+    private boolean clientCloseSocketIfTimeout = false;
+    private boolean clientShortConnectionEnable = false;
+
+    public boolean isClientNativeEpollEnable() {
+        return clientNativeEpollEnable;
+    }
+
+    public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) {
+        this.clientNativeEpollEnable = clientNativeEpollEnable;
+    }
+
+    public int getClientIoThreads() {
+        return clientIoThreads;
+    }
+
+    public void setClientIoThreads(final int clientIoThreads) {
+        this.clientIoThreads = clientIoThreads;
+    }
+
+    public int getClientWorkerThreads() {
+        return clientWorkerThreads;
+    }
+
+    public void setClientWorkerThreads(final int clientWorkerThreads) {
+        this.clientWorkerThreads = clientWorkerThreads;
+    }
+
+    public int getClientOnewayInvokeSemaphore() {
+        return clientOnewayInvokeSemaphore;
+    }
+
+    public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) {
+        this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore;
+    }
+
+    public int getClientAsyncInvokeSemaphore() {
+        return clientAsyncInvokeSemaphore;
+    }
+
+    public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) {
+        this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore;
+    }
+
+    public boolean isClientPooledBytebufAllocatorEnable() {
+        return clientPooledBytebufAllocatorEnable;
+    }
+
+    public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) {
+        this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable;
+    }
+
+    public boolean isClientCloseSocketIfTimeout() {
+        return clientCloseSocketIfTimeout;
+    }
+
+    public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
+        this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
+    }
+
+    public boolean isClientShortConnectionEnable() {
+        return clientShortConnectionEnable;
+    }
+
+    public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) {
+        this.clientShortConnectionEnable = clientShortConnectionEnable;
+    }
+
+    public int getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+    public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
+    }
+
+    @Override
+    public int getOnewayInvokeSemaphore() {
+        return this.clientOnewayInvokeSemaphore;
+    }
+
+    @Override
+    public int getAsyncInvokeSemaphore() {
+        return this.clientAsyncInvokeSemaphore;
+    }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index 9fa79c2..d6f636b 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -17,67 +17,26 @@
 
 package org.apache.rocketmq.remoting.config;
 
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-
-public class RemotingConfig extends TcpSocketConfig {
-    private int connectionMaxRetries = 3;
-    private int connectionChannelReaderIdleSeconds = 0;
-    private int connectionChannelWriterIdleSeconds = 0;
+public abstract class RemotingConfig extends TcpSocketConfig {
     /**
      * IdleStateEvent will be triggered when neither read nor write was
      * performed for the specified period of this time. Specify {@code 0} to
      * disable
      */
+    private int connectionChannelReaderIdleSeconds = 0;
+    private int connectionChannelWriterIdleSeconds = 0;
     private int connectionChannelIdleSeconds = 120;
+
     private int writeBufLowWaterMark = 32 * 10240;
     private int writeBufHighWaterMark = 64 * 10240;
-    private int threadTaskLowWaterMark = 30000;
-    private int threadTaskHighWaterMark = 50000;
-    private int connectionRetryBackoffMillis = 3000;
-    private int serviceThreadBlockQueueSize = 50000;
-    private boolean clientNativeEpollEnable = false;
-    private int clientWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int clientConnectionFutureAwaitTimeoutMillis = 3000;
-    private int clientAsyncCallbackExecutorThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int clientOnewayInvokeSemaphore = 20480;
 
-    //=============Server configuration==================
-    private int clientAsyncInvokeSemaphore = 20480;
-    private boolean clientPooledBytebufAllocatorEnable = false;
-    private boolean clientCloseSocketIfTimeout = true;
-    private boolean clientShortConnectionEnable = false;
-    private long clientPublishServiceTimeout = 10000;
-    private long clientConsumerServiceTimeout = 10000;
-    private long clientInvokeServiceTimeout = 10000;
-    private int clientMaxRetryCount = 10;
-    private int clientSleepBeforeRetry = 100;
-    private int serverListenPort = 8888;
-    /**
-     * If server only listened 1 port,recommend to set the value to 1
-     */
-    private int serverAcceptorThreads = 1;
-    private int serverIoThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int serverWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int serverOnewayInvokeSemaphore = 256;
-    private int serverAsyncInvokeSemaphore = 6400;
-    private boolean serverNativeEpollEnable = false;
-    private int serverAsyncCallbackExecutorThreads = Runtime.getRuntime().availableProcessors() * 2;
-    private boolean serverPooledBytebufAllocatorEnable = true;
-    private boolean serverAuthOpenEnable = true;
+    private int asyncHandlerExecutorThreads = Runtime.getRuntime().availableProcessors();
 
-    @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
-    }
+    private int publicExecutorThreads = 4;
 
-    public int getConnectionMaxRetries() {
-        return connectionMaxRetries;
-    }
+    public abstract int getOnewayInvokeSemaphore();
 
-    public void setConnectionMaxRetries(final int connectionMaxRetries) {
-        this.connectionMaxRetries = connectionMaxRetries;
-    }
+    public abstract int getAsyncInvokeSemaphore();
 
     public int getConnectionChannelReaderIdleSeconds() {
         return connectionChannelReaderIdleSeconds;
@@ -119,227 +78,19 @@
         this.writeBufHighWaterMark = writeBufHighWaterMark;
     }
 
-    public int getThreadTaskLowWaterMark() {
-        return threadTaskLowWaterMark;
+    public int getAsyncHandlerExecutorThreads() {
+        return asyncHandlerExecutorThreads;
     }
 
-    public void setThreadTaskLowWaterMark(final int threadTaskLowWaterMark) {
-        this.threadTaskLowWaterMark = threadTaskLowWaterMark;
+    public void setAsyncHandlerExecutorThreads(final int asyncHandlerExecutorThreads) {
+        this.asyncHandlerExecutorThreads = asyncHandlerExecutorThreads;
     }
 
-    public int getThreadTaskHighWaterMark() {
-        return threadTaskHighWaterMark;
+    public int getPublicExecutorThreads() {
+        return publicExecutorThreads;
     }
 
-    public void setThreadTaskHighWaterMark(final int threadTaskHighWaterMark) {
-        this.threadTaskHighWaterMark = threadTaskHighWaterMark;
-    }
-
-    public int getConnectionRetryBackoffMillis() {
-        return connectionRetryBackoffMillis;
-    }
-
-    public void setConnectionRetryBackoffMillis(final int connectionRetryBackoffMillis) {
-        this.connectionRetryBackoffMillis = connectionRetryBackoffMillis;
-    }
-
-    public int getServiceThreadBlockQueueSize() {
-        return serviceThreadBlockQueueSize;
-    }
-
-    public void setServiceThreadBlockQueueSize(final int serviceThreadBlockQueueSize) {
-        this.serviceThreadBlockQueueSize = serviceThreadBlockQueueSize;
-    }
-
-    public boolean isClientNativeEpollEnable() {
-        return clientNativeEpollEnable;
-    }
-
-    public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) {
-        this.clientNativeEpollEnable = clientNativeEpollEnable;
-    }
-
-    public int getClientWorkerThreads() {
-        return clientWorkerThreads;
-    }
-
-    public void setClientWorkerThreads(final int clientWorkerThreads) {
-        this.clientWorkerThreads = clientWorkerThreads;
-    }
-
-    public int getClientConnectionFutureAwaitTimeoutMillis() {
-        return clientConnectionFutureAwaitTimeoutMillis;
-    }
-
-    public void setClientConnectionFutureAwaitTimeoutMillis(final int clientConnectionFutureAwaitTimeoutMillis) {
-        this.clientConnectionFutureAwaitTimeoutMillis = clientConnectionFutureAwaitTimeoutMillis;
-    }
-
-    public int getClientAsyncCallbackExecutorThreads() {
-        return clientAsyncCallbackExecutorThreads;
-    }
-
-    public void setClientAsyncCallbackExecutorThreads(final int clientAsyncCallbackExecutorThreads) {
-        this.clientAsyncCallbackExecutorThreads = clientAsyncCallbackExecutorThreads;
-    }
-
-    public int getClientOnewayInvokeSemaphore() {
-        return clientOnewayInvokeSemaphore;
-    }
-
-    public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) {
-        this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore;
-    }
-
-    public int getClientAsyncInvokeSemaphore() {
-        return clientAsyncInvokeSemaphore;
-    }
-
-    public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) {
-        this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore;
-    }
-
-    public boolean isClientPooledBytebufAllocatorEnable() {
-        return clientPooledBytebufAllocatorEnable;
-    }
-
-    public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) {
-        this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable;
-    }
-
-    public boolean isClientCloseSocketIfTimeout() {
-        return clientCloseSocketIfTimeout;
-    }
-
-    public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
-        this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
-    }
-
-    public boolean isClientShortConnectionEnable() {
-        return clientShortConnectionEnable;
-    }
-
-    public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) {
-        this.clientShortConnectionEnable = clientShortConnectionEnable;
-    }
-
-    public long getClientPublishServiceTimeout() {
-        return clientPublishServiceTimeout;
-    }
-
-    public void setClientPublishServiceTimeout(final long clientPublishServiceTimeout) {
-        this.clientPublishServiceTimeout = clientPublishServiceTimeout;
-    }
-
-    public long getClientConsumerServiceTimeout() {
-        return clientConsumerServiceTimeout;
-    }
-
-    public void setClientConsumerServiceTimeout(final long clientConsumerServiceTimeout) {
-        this.clientConsumerServiceTimeout = clientConsumerServiceTimeout;
-    }
-
-    public long getClientInvokeServiceTimeout() {
-        return clientInvokeServiceTimeout;
-    }
-
-    public void setClientInvokeServiceTimeout(final long clientInvokeServiceTimeout) {
-        this.clientInvokeServiceTimeout = clientInvokeServiceTimeout;
-    }
-
-    public int getClientMaxRetryCount() {
-        return clientMaxRetryCount;
-    }
-
-    public void setClientMaxRetryCount(final int clientMaxRetryCount) {
-        this.clientMaxRetryCount = clientMaxRetryCount;
-    }
-
-    public int getClientSleepBeforeRetry() {
-        return clientSleepBeforeRetry;
-    }
-
-    public void setClientSleepBeforeRetry(final int clientSleepBeforeRetry) {
-        this.clientSleepBeforeRetry = clientSleepBeforeRetry;
-    }
-
-    public int getServerListenPort() {
-        return serverListenPort;
-    }
-
-    public void setServerListenPort(final int serverListenPort) {
-        this.serverListenPort = serverListenPort;
-    }
-
-    public int getServerAcceptorThreads() {
-        return serverAcceptorThreads;
-    }
-
-    public void setServerAcceptorThreads(final int serverAcceptorThreads) {
-        this.serverAcceptorThreads = serverAcceptorThreads;
-    }
-
-    public int getServerIoThreads() {
-        return serverIoThreads;
-    }
-
-    public void setServerIoThreads(final int serverIoThreads) {
-        this.serverIoThreads = serverIoThreads;
-    }
-
-    public int getServerWorkerThreads() {
-        return serverWorkerThreads;
-    }
-
-    public void setServerWorkerThreads(final int serverWorkerThreads) {
-        this.serverWorkerThreads = serverWorkerThreads;
-    }
-
-    public int getServerOnewayInvokeSemaphore() {
-        return serverOnewayInvokeSemaphore;
-    }
-
-    public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) {
-        this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore;
-    }
-
-    public int getServerAsyncInvokeSemaphore() {
-        return serverAsyncInvokeSemaphore;
-    }
-
-    public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) {
-        this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore;
-    }
-
-    public boolean isServerNativeEpollEnable() {
-        return serverNativeEpollEnable;
-    }
-
-    public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) {
-        this.serverNativeEpollEnable = serverNativeEpollEnable;
-    }
-
-    public int getServerAsyncCallbackExecutorThreads() {
-        return serverAsyncCallbackExecutorThreads;
-    }
-
-    public void setServerAsyncCallbackExecutorThreads(final int serverAsyncCallbackExecutorThreads) {
-        this.serverAsyncCallbackExecutorThreads = serverAsyncCallbackExecutorThreads;
-    }
-
-    public boolean isServerPooledBytebufAllocatorEnable() {
-        return serverPooledBytebufAllocatorEnable;
-    }
-
-    public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) {
-        this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable;
-    }
-
-    public boolean isServerAuthOpenEnable() {
-        return serverAuthOpenEnable;
-    }
-
-    public void setServerAuthOpenEnable(final boolean serverAuthOpenEnable) {
-        this.serverAuthOpenEnable = serverAuthOpenEnable;
+    public void setPublicExecutorThreads(final int publicExecutorThreads) {
+        this.publicExecutorThreads = publicExecutorThreads;
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java
new file mode 100644
index 0000000..9879364
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingServerConfig.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.config;
+
+public class RemotingServerConfig extends RemotingConfig {
+    private int serverListenPort = 8888;
+    /**
+     * If server only listened 1 port,recommend to set the value to 1
+     */
+    private int serverAcceptorThreads = 1;
+    private int serverIoThreads = 3;
+    private int serverWorkerThreads = 8;
+
+    private int serverOnewayInvokeSemaphore = 256;
+    private int serverAsyncInvokeSemaphore = 64;
+
+    private boolean serverNativeEpollEnable = false;
+    private boolean serverPooledBytebufAllocatorEnable = true;
+
+    public int getServerListenPort() {
+        return serverListenPort;
+    }
+
+    public void setServerListenPort(final int serverListenPort) {
+        this.serverListenPort = serverListenPort;
+    }
+
+    public int getServerAcceptorThreads() {
+        return serverAcceptorThreads;
+    }
+
+    public void setServerAcceptorThreads(final int serverAcceptorThreads) {
+        this.serverAcceptorThreads = serverAcceptorThreads;
+    }
+
+    public int getServerIoThreads() {
+        return serverIoThreads;
+    }
+
+    public void setServerIoThreads(final int serverIoThreads) {
+        this.serverIoThreads = serverIoThreads;
+    }
+
+    public int getServerWorkerThreads() {
+        return serverWorkerThreads;
+    }
+
+    public void setServerWorkerThreads(final int serverWorkerThreads) {
+        this.serverWorkerThreads = serverWorkerThreads;
+    }
+
+    public int getServerOnewayInvokeSemaphore() {
+        return serverOnewayInvokeSemaphore;
+    }
+
+    public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) {
+        this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore;
+    }
+
+    public int getServerAsyncInvokeSemaphore() {
+        return serverAsyncInvokeSemaphore;
+    }
+
+    public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) {
+        this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore;
+    }
+
+    public boolean isServerNativeEpollEnable() {
+        return serverNativeEpollEnable;
+    }
+
+    public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) {
+        this.serverNativeEpollEnable = serverNativeEpollEnable;
+    }
+
+    public boolean isServerPooledBytebufAllocatorEnable() {
+        return serverPooledBytebufAllocatorEnable;
+    }
+
+    public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) {
+        this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable;
+    }
+
+    @Override
+    public int getOnewayInvokeSemaphore() {
+        return this.serverOnewayInvokeSemaphore;
+    }
+
+    @Override
+    public int getAsyncInvokeSemaphore() {
+        return this.serverAsyncInvokeSemaphore;
+    }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
index 4dfcde7..d77bf3d 100755
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java
@@ -23,14 +23,14 @@
  * @see java.net.SocketOptions
  */
 public class TcpSocketConfig {
-    private boolean tcpSoReuseAddress;
-    private boolean tcpSoKeepAlive;
-    private boolean tcpSoNoDelay;
-    private int tcpSoSndBufSize;  // see /proc/sys/net/ipv4/tcp_rmem
-    private int tcpSoRcvBufSize;  // see /proc/sys/net/ipv4/tcp_wmem
-    private int tcpSoBacklogSize;
-    private int tcpSoLinger;
-    private int tcpSoTimeout;
+    private boolean tcpSoReuseAddress = true;
+    private boolean tcpSoKeepAlive = false;
+    private boolean tcpSoNoDelay = true;
+    private int tcpSoSndBufSize = 65535;  // see /proc/sys/net/ipv4/tcp_rmem
+    private int tcpSoRcvBufSize = 65535;  // see /proc/sys/net/ipv4/tcp_wmem
+    private int tcpSoBacklogSize = 1024;
+    private int tcpSoLinger = -1;
+    private int tcpSoTimeoutMillis = 3000;
 
     public boolean isTcpSoReuseAddress() {
         return tcpSoReuseAddress;
@@ -88,11 +88,11 @@
         this.tcpSoLinger = tcpSoLinger;
     }
 
-    public int getTcpSoTimeout() {
-        return tcpSoTimeout;
+    public int getTcpSoTimeoutMillis() {
+        return tcpSoTimeoutMillis;
     }
 
-    public void setTcpSoTimeout(final int tcpSoTimeout) {
-        this.tcpSoTimeout = tcpSoTimeout;
+    public void setTcpSoTimeoutMillis(final int tcpSoTimeoutMillis) {
+        this.tcpSoTimeoutMillis = tcpSoTimeoutMillis;
     }
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
index 8d3ff3a..988c20c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -29,12 +29,12 @@
     // + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4);
     public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4;
     public final static byte PROTOCOL_MAGIC = 0x14;
-    private final static char PROPERTY_SEPARATOR = '\n';
-    private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
     final static int REMARK_MAX_LEN = Short.MAX_VALUE;
     final static int PROPERTY_MAX_LEN = 524288; // 512KB
     final static int PAYLOAD_MAX_LEN = 16777216; // 16MB
     public final static int PACKET_MAX_LEN = MIN_PROTOCOL_LEN + REMARK_MAX_LEN + PROPERTY_MAX_LEN + PAYLOAD_MAX_LEN;
+    private final static char PROPERTY_SEPARATOR = '\n';
+    private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
 
     public static void encodeCommand(final RemotingCommand command, final ByteBufferWrapper out) {
         out.writeByte(PROTOCOL_MAGIC);
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
index 0b084a0..4f4ec5c 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
@@ -28,7 +28,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,10 +41,10 @@
     final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
     private final Lock lockChannelTables = new ReentrantLock();
     private final Bootstrap clientBootstrap;
-    private final RemotingConfig clientConfig;
+    private final RemotingClientConfig clientConfig;
 
     ClientChannelManager(final Bootstrap bootstrap,
-        final RemotingConfig config) {
+        final RemotingClientConfig config) {
         clientBootstrap = bootstrap;
         clientConfig = config;
     }
@@ -106,7 +106,7 @@
 
         if (cw != null) {
             ChannelFuture channelFuture = cw.getChannelFuture();
-            if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+            if (channelFuture.awaitUninterruptibly(this.clientConfig.getConnectTimeoutMillis())) {
                 if (cw.isActive()) {
                     LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                     return cw.getChannel();
@@ -115,7 +115,7 @@
                     this.closeChannel(addr, cw.getChannel());
                 }
             } else {
-                LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+                LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getConnectTimeoutMillis(),
                     channelFuture.toString());
                 this.closeChannel(addr, cw.getChannel());
             }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
index 1bf2277..432363d 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
@@ -18,8 +18,8 @@
 package org.apache.rocketmq.remoting.impl.netty;
 
 public enum NettyChannelEventType {
-    ACTIVE,
-    INACTIVE,
+    CONNECT,
+    CLOSE,
     IDLE,
     EXCEPTION
 }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 920a922..cbd0059 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -74,15 +74,20 @@
     private final String remotingInstanceId = UIDGenerator.instance().createUID();
 
     private final ExecutorService publicExecutor;
+    private final ExecutorService asyncHandlerExecutor;
     protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
     private InterceptorGroup interceptorGroup = new InterceptorGroup();
     private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
 
-    NettyRemotingAbstract(RemotingConfig clientConfig) {
-        this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
-        this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
+    NettyRemotingAbstract(RemotingConfig remotingConfig) {
+        this.semaphoreOneway = new Semaphore(remotingConfig.getOnewayInvokeSemaphore(), true);
+        this.semaphoreAsync = new Semaphore(remotingConfig.getAsyncInvokeSemaphore(), true);
         this.publicExecutor = ThreadUtils.newFixedThreadPool(
-            clientConfig.getClientAsyncCallbackExecutorThreads(),
+            remotingConfig.getPublicExecutorThreads(),
+            10000, "Remoting-PublicExecutor", true);
+
+        this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool(
+            remotingConfig.getAsyncHandlerExecutorThreads(),
             10000, "Remoting-PublicExecutor", true);
         this.remotingCommandFactory = new RemotingCommandFactoryImpl();
     }
@@ -133,7 +138,9 @@
 
     @Override
     public void stop() {
+        ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
         ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
+        ThreadUtils.shutdownGracefully(asyncHandlerExecutor, 2000, TimeUnit.MILLISECONDS);
         ThreadUtils.shutdownGracefully(channelEventExecutor);
     }
 
@@ -234,16 +241,12 @@
         channel.writeAndFlush(msg);
     }
 
-    public ExecutorService getCallbackExecutor() {
-        return this.publicExecutor;
-    }
-
     /**
      * Execute callback in callback executor. If callback executor is null, run directly in current thread
      */
     private void executeAsyncHandler(final ResponseFuture responseFuture) {
         boolean runInThisThread = false;
-        ExecutorService executor = this.getCallbackExecutor();
+        ExecutorService executor = asyncHandlerExecutor;
         if (executor != null) {
             try {
                 executor.submit(new Runnable() {
@@ -549,10 +552,10 @@
                             case IDLE:
                                 listener.onChannelIdle(channel);
                                 break;
-                            case INACTIVE:
+                            case CLOSE:
                                 listener.onChannelClose(channel);
                                 break;
-                            case ACTIVE:
+                            case CONNECT:
                                 listener.onChannelConnect(channel);
                                 break;
                             case EXCEPTION:
@@ -571,7 +574,7 @@
 
     }
 
-    protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
+    protected class RemotingCommandDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index ce30aa2..6b2796e 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -37,18 +37,16 @@
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 import java.net.SocketAddress;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
 import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
-import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
 import org.apache.rocketmq.remoting.internal.JvmUtils;
 
 public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
@@ -56,21 +54,21 @@
     private final EventLoopGroup ioGroup;
     private final Class<? extends SocketChannel> socketChannelClass;
 
-    private final RemotingConfig clientConfig;
+    private final RemotingClientConfig clientConfig;
 
     private EventExecutorGroup workerGroup;
     private ClientChannelManager clientChannelManager;
 
-    public NettyRemotingClient(final RemotingConfig clientConfig) {
+    public NettyRemotingClient(final RemotingClientConfig clientConfig) {
         super(clientConfig);
         this.clientConfig = clientConfig;
 
         if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
-            this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+            this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
                 clientConfig.getClientWorkerThreads()));
             socketChannelClass = EpollSocketChannel.class;
         } else {
-            this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
+            this.ioGroup = new NioEventLoopGroup(clientConfig.getClientIoThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
                 clientConfig.getClientWorkerThreads()));
             socketChannelClass = NioSocketChannel.class;
         }
@@ -88,15 +86,14 @@
         this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
-                public void initChannel(SocketChannel ch) throws Exception {
+                public void initChannel(SocketChannel ch) {
                     ch.pipeline().addLast(workerGroup,
                         new Decoder(),
                         new Encoder(),
                         new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
                             clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
                         new ClientConnectionHandler(),
-                        new EventDispatcher(),
-                        new ExceptionHandler());
+                        new RemotingCommandDispatcher());
                 }
             });
 
@@ -108,14 +105,10 @@
     @Override
     public void stop() {
         try {
-            ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
-
             clientChannelManager.clear();
 
             this.ioGroup.shutdownGracefully();
 
-            ThreadUtils.shutdownGracefully(channelEventExecutor);
-
             this.workerGroup.shutdownGracefully();
         } catch (Exception e) {
             LOG.warn("RemotingClient stopped error !", e);
@@ -126,10 +119,6 @@
 
     private void applyOptions(Bootstrap bootstrap) {
         if (null != clientConfig) {
-            if (clientConfig.getTcpSoLinger() > 0) {
-                bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger());
-            }
-
             if (clientConfig.getTcpSoSndBufSize() > 0) {
                 bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize());
             }
@@ -137,10 +126,9 @@
                 bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize());
             }
 
-            bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()).
-                option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
+            bootstrap.option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
                 option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()).
-                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()).
+                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeoutMillis()).
                 option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(),
                     clientConfig.getWriteBufHighWaterMark()));
         }
@@ -206,7 +194,7 @@
             LOG.info("Connected from {} to {}.", localAddress, remoteAddress);
             super.connect(ctx, remoteAddress, localAddress, promise);
 
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel()));
         }
 
         @Override
@@ -217,7 +205,7 @@
 
             super.disconnect(ctx, promise);
 
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
         }
 
         @Override
@@ -228,11 +216,11 @@
 
             super.close(ctx, promise);
 
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
         }
 
         @Override
-        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
             if (evt instanceof IdleStateEvent) {
                 IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -246,7 +234,7 @@
         }
 
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
             LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
             NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
             putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index f1e9360..f0dbb45 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -39,12 +39,11 @@
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingServer;
 import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingServerConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
 import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
@@ -52,7 +51,7 @@
 import org.apache.rocketmq.remoting.internal.JvmUtils;
 
 public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
-    private final RemotingConfig serverConfig;
+    private final RemotingServerConfig serverConfig;
 
     private final ServerBootstrap serverBootstrap;
     private final EventLoopGroup bossGroup;
@@ -62,7 +61,7 @@
 
     private int port;
 
-    public NettyRemotingServer(final RemotingConfig serverConfig) {
+    public NettyRemotingServer(final RemotingServerConfig serverConfig) {
         super(serverConfig);
 
         this.serverBootstrap = new ServerBootstrap();
@@ -107,7 +106,7 @@
                         serverConfig.getConnectionChannelWriterIdleSeconds(),
                         serverConfig.getConnectionChannelIdleSeconds()),
                     new ServerConnectionHandler(),
-                    new EventDispatcher());
+                    new RemotingCommandDispatcher());
             }
         });
 
@@ -122,10 +121,6 @@
     @Override
     public void stop() {
         try {
-            ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
-
-            ThreadUtils.shutdownGracefully(channelEventExecutor);
-
             this.bossGroup.shutdownGracefully().syncUninterruptibly();
 
             this.ioGroup.shutdownGracefully().syncUninterruptibly();
@@ -160,11 +155,11 @@
             bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()).
                 childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()).
                 childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()).
-                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout());
-        }
+                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeoutMillis());
 
-        if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
-            bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            if (serverConfig.isServerPooledBytebufAllocatorEnable()) {
+                bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+            }
         }
     }
 
@@ -194,28 +189,32 @@
     private class ServerConnectionHandler extends ChannelDuplexHandler {
         @Override
         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} registered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelRegistered(ctx);
         }
 
         @Override
         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} unregistered, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelUnregistered(ctx);
         }
 
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} became active, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelActive(ctx);
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, ctx.channel()));
         }
 
         @Override
         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            LOG.info("Channel {} became inactive, remote address {}.", ctx.channel(), ctx.channel().remoteAddress());
             super.channelInactive(ctx);
-            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, ctx.channel()));
         }
 
         @Override
-        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
             if (evt instanceof IdleStateEvent) {
                 final IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -233,9 +232,9 @@
         }
 
         @Override
-        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
+        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+            LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
             putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
-
             ctx.channel().close().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index aab6dfb..ed7c93a 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -43,7 +43,8 @@
         ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
     }
 
-    protected void runInThreads(final Runnable runnable, int threadsNum, int timeoutMillis) throws InterruptedException {
+    protected void runInThreads(final Runnable runnable, int threadsNum,
+        int timeoutMillis) throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
 
         runInThreads(new Runnable() {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
index b671662..56440e5 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnceTest.java
@@ -21,7 +21,7 @@
 import org.apache.rocketmq.remoting.BaseTest;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 public class SemaphoreReleaseOnlyOnceTest extends BaseTest {
 
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
index c1274c6..391c8bd 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImplTest.java
@@ -22,7 +22,10 @@
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class RemotingCommandFactoryImplTest {
     private RemotingCommandFactory factory = new RemotingCommandFactoryImpl();
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
index 5686124..4620542 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/command/RequestIdGeneratorTest.java
@@ -19,7 +19,7 @@
 
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 public class RequestIdGeneratorTest {
 
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
index 16084d8..16618e8 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
@@ -26,7 +26,7 @@
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.BaseTest;
-import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.config.RemotingClientConfig;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -60,7 +60,7 @@
         when(channel.close()).thenReturn(channelPromise);
         when(channel.remoteAddress()).thenReturn(new InetSocketAddress(8080));
 
-        channelManager = new ClientChannelManager(clientBootstrap, new RemotingConfig());
+        channelManager = new ClientChannelManager(clientBootstrap, new RemotingClientConfig());
     }
 
     @Test
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
index 9a379e9..1629342 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/EncoderTest.java
@@ -52,7 +52,6 @@
         assertEquals(request, decodedRequest);
     }
 
-
     @Test
     public void encode_LenOverLimit_ChannelClosed() {
         EmbeddedChannel channel = new EmbeddedChannel(new Encoder());