Polish interceptor and async handler
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java
index 106431b..322503c 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java
@@ -26,9 +26,7 @@
  * @since 1.0.0
  */
 public interface AsyncHandler {
-    void onFailure(RemotingCommand command);
+    void onFailure(RemotingCommand request, Throwable cause);
 
-    void onSuccess(RemotingCommand command);
-
-    void onTimeout(long costTimeMillis, long timeoutMillis);
+    void onSuccess(RemotingCommand response);
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java
index 1603af4..ee67272 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java
@@ -24,5 +24,5 @@
 
     void invokeAsync(String address, RemotingCommand request, AsyncHandler asyncHandler, long timeoutMillis);
 
-    void invokeOneWay(String address, RemotingCommand request, long timeoutMillis);
+    void invokeOneWay(String address, RemotingCommand request);
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java
index f36c83c..785f83e 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java
@@ -25,8 +25,7 @@
 
     RemotingCommand invoke(RemotingChannel remotingChannel, RemotingCommand request, long timeoutMillis);
 
-    void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler,
-        long timeoutMillis);
+    void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler, long timeoutMillis);
 
-    void invokeOneWay(RemotingChannel remotingChannel, RemotingCommand request, long timeoutMillis);
+    void invokeOneWay(RemotingChannel remotingChannel, RemotingCommand request);
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
index 57f3743..9cb59b4 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
@@ -25,14 +25,13 @@
 public interface RemotingService extends ConnectionService, ObjectLifecycle {
     void registerInterceptor(Interceptor interceptor);
 
-    void registerRequestProcessor(final short requestCode, final RequestProcessor processor,
-        final ExecutorService executor);
+    void registerRequestProcessor(short requestCode, RequestProcessor processor, ExecutorService executor);
 
-    void registerRequestProcessor(final short requestCode, final RequestProcessor processor);
+    void registerRequestProcessor(short requestCode, RequestProcessor processor);
 
-    void unregisterRequestProcessor(final short requestCode);
+    void unregisterRequestProcessor(short requestCode);
 
-    Pair<RequestProcessor, ExecutorService> processor(final short requestCode);
+    Pair<RequestProcessor, ExecutorService> processor(short requestCode);
 
     String remotingInstanceId();
 
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
deleted file mode 100644
index 2452309..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.interceptor;
-
-import org.apache.rocketmq.remoting.api.RemotingEndPoint;
-import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-
-public class ExceptionContext extends RequestContext {
-    private Throwable exception;
-    private String remark;
-
-    public ExceptionContext(RemotingEndPoint remotingEndPoint, String remoteAddr, RemotingCommand request,
-        Throwable exception, String remark) {
-        super(remotingEndPoint, remoteAddr, request);
-        this.remotingEndPoint = remotingEndPoint;
-        this.remoteAddr = remoteAddr;
-        this.request = request;
-        this.exception = exception;
-        this.remark = remark;
-    }
-
-    public RemotingEndPoint getRemotingEndPoint() {
-        return remotingEndPoint;
-    }
-
-    public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) {
-        this.remotingEndPoint = remotingEndPoint;
-    }
-
-    public String getRemoteAddr() {
-        return remoteAddr;
-    }
-
-    public void setRemoteAddr(String remoteAddr) {
-        this.remoteAddr = remoteAddr;
-    }
-
-    public RemotingCommand getRequest() {
-        return request;
-    }
-
-    public void setRequest(RemotingCommand request) {
-        this.request = request;
-    }
-
-    public Throwable getException() {
-        return exception;
-    }
-
-    public void setException(Throwable exception) {
-        this.exception = exception;
-    }
-
-    public String getRemark() {
-        return remark;
-    }
-
-    public void setRemark(String remark) {
-        this.remark = remark;
-    }
-}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
index 62257ef..98a04cb 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
@@ -18,9 +18,7 @@
 package org.apache.rocketmq.remoting.api.interceptor;
 
 public interface Interceptor {
-    void beforeRequest(final RequestContext context);
+    void beforeRequest(RequestContext context);
 
-    void afterResponseReceived(final ResponseContext context);
-
-    void onException(final ExceptionContext context);
+    void afterResponseReceived(ResponseContext context);
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
index 9ffc696..e7baeed 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
@@ -21,7 +21,7 @@
 import java.util.List;
 
 public class InterceptorGroup {
-    private final List<Interceptor> interceptors = new ArrayList<Interceptor>();
+    private final List<Interceptor> interceptors = new ArrayList<>();
 
     public void registerInterceptor(final Interceptor interceptor) {
         if (interceptor != null) {
@@ -40,10 +40,4 @@
             interceptor.afterResponseReceived(context);
         }
     }
-
-    public void onException(final ExceptionContext context) {
-        for (Interceptor interceptor : interceptors) {
-            interceptor.onException(context);
-        }
-    }
 }
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
index c7f7a9b..005aa28 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
@@ -34,30 +34,6 @@
         this.response = response;
     }
 
-    public RemotingEndPoint getRemotingEndPoint() {
-        return remotingEndPoint;
-    }
-
-    public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) {
-        this.remotingEndPoint = remotingEndPoint;
-    }
-
-    public String getRemoteAddr() {
-        return remoteAddr;
-    }
-
-    public void setRemoteAddr(String remoteAddr) {
-        this.remoteAddr = remoteAddr;
-    }
-
-    public RemotingCommand getRequest() {
-        return request;
-    }
-
-    public void setRequest(RemotingCommand request) {
-        this.request = request;
-    }
-
     @Override
     public String toString() {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
similarity index 67%
rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index 92f501f..e614963 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -23,16 +23,14 @@
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
-import org.apache.rocketmq.remoting.api.RemotingEndPoint;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
 import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
-import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.jetbrains.annotations.Nullable;
 
-public class ResponseResult {
+public class ResponseFuture {
     private final long beginTimestamp = System.currentTimeMillis();
     private final CountDownLatch countDownLatch = new CountDownLatch(1);
-    private final AtomicBoolean interceptorExecuted = new AtomicBoolean(false);
+    private final AtomicBoolean asyncHandlerExecuted = new AtomicBoolean(false);
 
     private int requestId;
     private long timeoutMillis;
@@ -47,54 +45,27 @@
     private InterceptorGroup interceptorGroup;
     private String remoteAddr;
 
-    public ResponseResult(int requestId, long timeoutMillis, AsyncHandler asyncHandler, SemaphoreReleaseOnlyOnce once) {
+    public ResponseFuture(int requestId, long timeoutMillis, AsyncHandler asyncHandler, @Nullable SemaphoreReleaseOnlyOnce once) {
         this.requestId = requestId;
         this.timeoutMillis = timeoutMillis;
         this.asyncHandler = asyncHandler;
         this.once = once;
     }
 
-    public ResponseResult(int requestId, long timeoutMillis) {
+    public ResponseFuture(int requestId, long timeoutMillis) {
         this.requestId = requestId;
         this.timeoutMillis = timeoutMillis;
     }
 
-    public void executeRequestSendFailed() {
-        if (this.interceptorExecuted.compareAndSet(false, true)) {
-            try {
-                interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand,
-                    cause, "REQUEST_SEND_FAILED"));
-            } catch (Throwable e) {
-            }
-            //Sync call
-            if (null != asyncHandler) {
-                asyncHandler.onFailure(requestCommand);
-            }
-        }
-    }
-
-    public void executeCallbackArrived(final RemotingCommand response) {
-        if (this.interceptorExecuted.compareAndSet(false, true)) {
-            try {
-                interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, this.remoteAddr,
-                    this.requestCommand, response));
-            } catch (Throwable e) {
-            }
-            if (null != asyncHandler) {
-                asyncHandler.onSuccess(response);
-            }
-        }
-    }
-
-    public void onTimeout(long costTimeMillis, long timoutMillis) {
-        if (this.interceptorExecuted.compareAndSet(false, true)) {
-            try {
-                interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand,
-                    null, "CALLBACK_TIMEOUT"));
-            } catch (Throwable ignore) {
-            }
-            if (null != asyncHandler) {
-                asyncHandler.onTimeout(costTimeMillis, timoutMillis);
+    public void executeAsyncHandler() {
+        if (asyncHandler != null) {
+            if (this.asyncHandlerExecuted.compareAndSet(false, true)) {
+                if (cause != null) {
+                    asyncHandler.onFailure(requestCommand, cause);
+                } else {
+                    assert responseCommand != null;
+                    asyncHandler.onSuccess(responseCommand);
+                }
             }
         }
     }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index cbb211e..ac989f8 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -24,7 +24,8 @@
 import io.netty.channel.SimpleChannelInboundHandler;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -44,14 +45,13 @@
 import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
 import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
 import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
 import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
 import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
 import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
 import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.ResponseResult;
+import org.apache.rocketmq.remoting.common.ResponseFuture;
 import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
 import org.apache.rocketmq.remoting.config.RemotingConfig;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
@@ -68,7 +68,7 @@
     protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
     private final Semaphore semaphoreOneway;
     private final Semaphore semaphoreAsync;
-    private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256);
+    private final Map<Integer, ResponseFuture> ackTables = new ConcurrentHashMap<Integer, ResponseFuture>(256);
     private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>();
     private final RemotingCommandFactory remotingCommandFactory;
     private final String remotingInstanceId = UIDGenerator.instance().createUID();
@@ -101,23 +101,23 @@
     }
 
     void scanResponseTable() {
-        Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator();
-        while (iterator.hasNext()) {
-            Map.Entry<Integer, ResponseResult> next = iterator.next();
-            ResponseResult result = next.getValue();
+        final List<Integer> rList = new ArrayList<>();
 
-            if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) {
-                iterator.remove();
-                try {
-                    long timeoutMillis = result.getTimeoutMillis();
-                    long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp();
-                    result.onTimeout(timeoutMillis, costTimeMillis);
-                } catch (Throwable e) {
-                    LOG.warn("Error occurred when execute timeout callback !", e);
-                } finally {
-                    result.release();
-                    LOG.warn("Removed timeout request {} ", result);
-                }
+        for (final Map.Entry<Integer, ResponseFuture> next : this.ackTables.entrySet()) {
+            ResponseFuture responseFuture = next.getValue();
+
+            if ((responseFuture.getBeginTimestamp() + responseFuture.getTimeoutMillis()) <= System.currentTimeMillis()) {
+                rList.add(responseFuture.getRequestId());
+            }
+        }
+
+        for (Integer requestID: rList) {
+            ResponseFuture rf = this.ackTables.remove(requestID);
+
+            if (rf != null) {
+                LOG.warn("remove timeout request {} ", rf);
+                rf.setCause(new RemoteTimeoutException(rf.getRemoteAddr(), rf.getTimeoutMillis()));
+                executeAsyncHandler(rf);
             }
         }
     }
@@ -167,9 +167,6 @@
                 extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
 
             if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
-                interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE,
-                    extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL"));
-
                 RemotingCommand response = remotingCommandFactory.createResponse(cmd);
                 response.opCode(RemotingSysResponseCode.SYSTEM_BUSY);
                 response.remark("SYSTEM_BUSY");
@@ -178,49 +175,23 @@
         }
     }
 
-    private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
-        final ResponseResult responseResult = ackTables.get(cmd.requestID());
-        if (responseResult != null) {
-            responseResult.setResponseCommand(cmd);
-            responseResult.release();
+    private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand response) {
+        final ResponseFuture responseFuture = ackTables.remove(response.requestID());
+        if (responseFuture != null) {
+            responseFuture.setResponseCommand(response);
+            responseFuture.release();
 
-            ackTables.remove(cmd.requestID());
+            this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
+                extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response));
 
-            if (responseResult.getAsyncHandler() != null) {
-                boolean sameThread = false;
-                ExecutorService executor = this.getCallbackExecutor();
-                if (executor != null) {
-                    try {
-                        executor.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                try {
-                                    responseResult.executeCallbackArrived(responseResult.getResponseCommand());
-                                } catch (Throwable e) {
-                                    LOG.warn("Execute callback error !", e);
-                                }
-                            }
-                        });
-                    } catch (RejectedExecutionException e) {
-                        sameThread = true;
-                        LOG.warn("Execute submit error !", e);
-                    }
-                } else {
-                    sameThread = true;
-                }
-
-                if (sameThread) {
-                    try {
-                        responseResult.executeCallbackArrived(responseResult.getResponseCommand());
-                    } catch (Throwable e) {
-                        LOG.warn("Execute callback in response thread error !", e);
-                    }
-                }
+            if (responseFuture.getAsyncHandler() != null) {
+                executeAsyncHandler(responseFuture);
             } else {
-                responseResult.putResponse(cmd);
+                responseFuture.putResponse(response);
+                responseFuture.release();
             }
         } else {
-            LOG.warn("request {} from {} has not matched response !", cmd, extractRemoteAddress(ctx.channel()));
+            LOG.warn("request {} from {} has not matched response !", response, extractRemoteAddress(ctx.channel()));
         }
     }
 
@@ -261,6 +232,60 @@
         return this.publicExecutor;
     }
 
+    /**
+     * Execute callback in callback executor. If callback executor is null, run directly in current thread
+     */
+    private void executeAsyncHandler(final ResponseFuture responseFuture) {
+        boolean runInThisThread = false;
+        ExecutorService executor = this.getCallbackExecutor();
+        if (executor != null) {
+            try {
+                executor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            responseFuture.executeAsyncHandler();
+                        } catch (Throwable e) {
+                            LOG.warn("execute callback in executor exception, and callback throw", e);
+                        } finally {
+                            responseFuture.release();
+                        }
+                    }
+                });
+            } catch (Throwable e) {
+                runInThisThread = true;
+                LOG.warn("execute callback in executor exception, maybe executor busy", e);
+            }
+        } else {
+            runInThisThread = true;
+        }
+
+        if (runInThisThread) {
+            try {
+                responseFuture.executeAsyncHandler();
+            } catch (Throwable e) {
+                LOG.warn("executeInvokeCallback Exception", e);
+            } finally {
+                responseFuture.release();
+            }
+        }
+    }
+
+    private void requestFail(final int requestID, final Throwable cause) {
+        ResponseFuture responseFuture = ackTables.remove(requestID);
+        if (responseFuture != null) {
+            responseFuture.setSendRequestOK(false);
+            responseFuture.putResponse(null);
+            responseFuture.setCause(cause);
+            executeAsyncHandler(responseFuture);
+        }
+    }
+
+    private void requestFail(final ResponseFuture responseFuture, final Throwable cause) {
+        responseFuture.setCause(cause);
+        executeAsyncHandler(responseFuture);
+    }
+
     private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
         if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
             if (response != null) {
@@ -277,8 +302,10 @@
 
     private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) {
         if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
-            //FiXME Exception interceptor can not throw exception
-            interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
+            RemotingCommand response = remotingCommandFactory.createResponse(cmd);
+            response.opCode(RemotingSysResponseCode.SYSTEM_ERROR);
+            response.remark("SYSTEM_ERROR");
+            writeAndFlush(ctx.channel(), response);
         }
     }
 
@@ -288,7 +315,6 @@
 
         final String remoteAddr = extractRemoteAddress(channel);
 
-        //FIXME try catch here
         this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
 
         RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis);
@@ -302,27 +328,25 @@
     private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request,
         final long timeoutMillis) {
         try {
-            final int opaque = request.requestID();
-            final ResponseResult responseResult = new ResponseResult(opaque, timeoutMillis);
-            responseResult.setRequestCommand(request);
-            //FIXME one interceptor for all case ?
-            responseResult.setInterceptorGroup(this.interceptorGroup);
-            responseResult.setRemoteAddr(remoteAddr);
+            final int requestID = request.requestID();
+            final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis);
+            responseFuture.setRequestCommand(request);
+            responseFuture.setRemoteAddr(remoteAddr);
 
-            this.ackTables.put(opaque, responseResult);
+            this.ackTables.put(requestID, responseFuture);
 
             ChannelFutureListener listener = new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture f) throws Exception {
                     if (f.isSuccess()) {
-                        responseResult.setSendRequestOK(true);
+                        responseFuture.setSendRequestOK(true);
                         return;
                     } else {
-                        responseResult.setSendRequestOK(false);
+                        responseFuture.setSendRequestOK(false);
 
-                        ackTables.remove(opaque);
-                        responseResult.setCause(f.cause());
-                        responseResult.putResponse(null);
+                        ackTables.remove(requestID);
+                        responseFuture.setCause(f.cause());
+                        responseFuture.putResponse(null);
 
                         LOG.warn("Send request command to {} failed !", remoteAddr);
                     }
@@ -331,14 +355,14 @@
 
             this.writeAndFlush(channel, request, listener);
 
-            RemotingCommand responseCommand = responseResult.waitResponse(timeoutMillis);
+            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
 
             if (null == responseCommand) {
-                if (responseResult.isSendRequestOK()) {
-                    throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause());
+                if (responseFuture.isSendRequestOK()) {
+                    throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
                 }
                 else {
-                    throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause());
+                    throw new RemoteAccessException(extractRemoteAddress(channel), responseFuture.getCause());
                 }
             }
 
@@ -360,98 +384,57 @@
 
         this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
 
-        Exception exception = null;
-
-        try {
-            this.invokeAsync0(remoteAddr, channel, request, timeoutMillis, invokeCallback);
-        } catch (InterruptedException e) {
-            exception = e;
-        } finally {
-            if (null != exception) {
-                try {
-                    this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
-                } catch (Throwable e) {
-                    LOG.warn("onException ", e);
-                }
-            }
-        }
+        this.invokeAsync0(remoteAddr, channel, request, invokeCallback, timeoutMillis);
     }
 
     private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request,
-        final long timeoutMillis, final AsyncHandler invokeCallback) throws InterruptedException {
-        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+        final AsyncHandler asyncHandler, final long timeoutMillis) {
+        boolean acquired = this.semaphoreAsync.tryAcquire();
         if (acquired) {
             final int requestID = request.requestID();
 
             SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
 
-            final ResponseResult responseResult = new ResponseResult(request.requestID(), timeoutMillis, invokeCallback, once);
-            responseResult.setRequestCommand(request);
-            responseResult.setInterceptorGroup(this.interceptorGroup);
-            responseResult.setRemoteAddr(remoteAddr);
+            final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis, asyncHandler, once);
+            responseFuture.setRequestCommand(request);
+            responseFuture.setRemoteAddr(remoteAddr);
 
-            this.ackTables.put(request.requestID(), responseResult);
+            this.ackTables.put(requestID, responseFuture);
             try {
                 ChannelFutureListener listener = new ChannelFutureListener() {
                     @Override
-                    public void operationComplete(ChannelFuture f) throws Exception {
-                        responseResult.setSendRequestOK(f.isSuccess());
+                    public void operationComplete(ChannelFuture f) {
+                        responseFuture.setSendRequestOK(f.isSuccess());
                         if (f.isSuccess()) {
                             return;
                         }
 
-                        responseResult.putResponse(null);
-                        ackTables.remove(requestID);
-                        try {
-                            responseResult.executeRequestSendFailed();
-                        } catch (Throwable e) {
-                            LOG.warn("Execute callback error !", e);
-                        } finally {
-                            responseResult.release();
-                        }
-
+                        requestFail(requestID, f.cause());
                         LOG.warn("Send request command to channel  failed.", remoteAddr);
                     }
                 };
 
                 this.writeAndFlush(channel, request, listener);
             } catch (Exception e) {
-                responseResult.release();
+                requestFail(requestID, e);
                 LOG.error("Send request command to channel " + channel + " error !", e);
             }
         } else {
-            String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
-                timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+            String info = String.format("No available async semaphore to issue the request request %s", request.toString());
+            requestFail(new ResponseFuture(request.requestID(), timeoutMillis, asyncHandler, null), new RemoteAccessException(info));
             LOG.error(info);
-            throw new RemoteTimeoutException(info);
         }
     }
 
-    public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request, long timeoutMillis) {
+    public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request) {
         request.trafficType(TrafficType.REQUEST_ONEWAY);
 
         this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request));
-
-        Exception exception = null;
-
-        try {
-            this.invokeOneway0(channel, request, timeoutMillis);
-        } catch (InterruptedException e) {
-            exception = e;
-        } finally {
-            if (null != exception) {
-                try {
-                    this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
-                } catch (Throwable e) {
-                    LOG.warn("onException ", e);
-                }
-            }
-        }
+        this.invokeOneway0(channel, request);
     }
 
-    private void invokeOneway0(final Channel channel, final RemotingCommand request,
-        final long timeoutMillis) throws InterruptedException {
-        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+    private void invokeOneway0(final Channel channel, final RemotingCommand request) {
+        boolean acquired = this.semaphoreOneway.tryAcquire();
         if (acquired) {
             final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
             try {
@@ -459,7 +442,7 @@
 
                 ChannelFutureListener listener = new ChannelFutureListener() {
                     @Override
-                    public void operationComplete(ChannelFuture f) throws Exception {
+                    public void operationComplete(ChannelFuture f) {
                         once.release();
                         if (!f.isSuccess()) {
                             LOG.warn("Send request command to channel {} failed !", socketAddress);
@@ -473,10 +456,8 @@
                 LOG.error("Send request command to channel " + channel + " error !", e);
             }
         } else {
-            String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
-                timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+            String info = String.format("No available oneway semaphore to issue the request %s", request.toString());
             LOG.error(info);
-            throw new RemoteTimeoutException(info);
         }
     }
 
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index b9f9a64..f098de3 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -351,11 +351,6 @@
 
         final Channel channel = this.createIfAbsent(address);
         if (channel != null && channel.isActive()) {
-            // We support Netty's channel-level backpressure thereby respecting slow receivers on the other side.
-            if (!channel.isWritable()) {
-                // Note: It's up to the layer above a transport to decide whether or not to requeue a canceled write.
-                LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
-            }
             this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis);
         } else {
             this.closeChannel(address, channel);
@@ -363,15 +358,10 @@
     }
 
     @Override
-    public void invokeOneWay(final String address, final RemotingCommand request, final long timeoutMillis) {
+    public void invokeOneWay(final String address, final RemotingCommand request) {
         final Channel channel = this.createIfAbsent(address);
         if (channel != null && channel.isActive()) {
-            if (!channel.isWritable()) {
-                //if (this.clientConfig.isSocketFlowControl()) {
-                LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
-                //throw new ServiceInvocationFailureException(String.format("Channel[%s] is not writable now", channel.toString()));
-            }
-            this.invokeOnewayWithInterceptor(channel, request, timeoutMillis);
+            this.invokeOnewayWithInterceptor(channel, request);
         } else {
             this.closeChannel(address, channel);
         }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index 60aca5e..40e3cb7 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -195,9 +195,8 @@
     }
 
     @Override
-    public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request,
-        final long timeoutMillis) {
-        invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, timeoutMillis);
+    public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request) {
+        invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request);
     }
 
     private class ServerConnectionHandler extends ChannelDuplexHandler {