Polish interceptor and async handler
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java
index 106431b..322503c 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java
@@ -26,9 +26,7 @@
* @since 1.0.0
*/
public interface AsyncHandler {
- void onFailure(RemotingCommand command);
+ void onFailure(RemotingCommand request, Throwable cause);
- void onSuccess(RemotingCommand command);
-
- void onTimeout(long costTimeMillis, long timeoutMillis);
+ void onSuccess(RemotingCommand response);
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java
index 1603af4..ee67272 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java
@@ -24,5 +24,5 @@
void invokeAsync(String address, RemotingCommand request, AsyncHandler asyncHandler, long timeoutMillis);
- void invokeOneWay(String address, RemotingCommand request, long timeoutMillis);
+ void invokeOneWay(String address, RemotingCommand request);
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java
index f36c83c..785f83e 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java
@@ -25,8 +25,7 @@
RemotingCommand invoke(RemotingChannel remotingChannel, RemotingCommand request, long timeoutMillis);
- void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler,
- long timeoutMillis);
+ void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler, long timeoutMillis);
- void invokeOneWay(RemotingChannel remotingChannel, RemotingCommand request, long timeoutMillis);
+ void invokeOneWay(RemotingChannel remotingChannel, RemotingCommand request);
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
index 57f3743..9cb59b4 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java
@@ -25,14 +25,13 @@
public interface RemotingService extends ConnectionService, ObjectLifecycle {
void registerInterceptor(Interceptor interceptor);
- void registerRequestProcessor(final short requestCode, final RequestProcessor processor,
- final ExecutorService executor);
+ void registerRequestProcessor(short requestCode, RequestProcessor processor, ExecutorService executor);
- void registerRequestProcessor(final short requestCode, final RequestProcessor processor);
+ void registerRequestProcessor(short requestCode, RequestProcessor processor);
- void unregisterRequestProcessor(final short requestCode);
+ void unregisterRequestProcessor(short requestCode);
- Pair<RequestProcessor, ExecutorService> processor(final short requestCode);
+ Pair<RequestProcessor, ExecutorService> processor(short requestCode);
String remotingInstanceId();
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
deleted file mode 100644
index 2452309..0000000
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.api.interceptor;
-
-import org.apache.rocketmq.remoting.api.RemotingEndPoint;
-import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-
-public class ExceptionContext extends RequestContext {
- private Throwable exception;
- private String remark;
-
- public ExceptionContext(RemotingEndPoint remotingEndPoint, String remoteAddr, RemotingCommand request,
- Throwable exception, String remark) {
- super(remotingEndPoint, remoteAddr, request);
- this.remotingEndPoint = remotingEndPoint;
- this.remoteAddr = remoteAddr;
- this.request = request;
- this.exception = exception;
- this.remark = remark;
- }
-
- public RemotingEndPoint getRemotingEndPoint() {
- return remotingEndPoint;
- }
-
- public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) {
- this.remotingEndPoint = remotingEndPoint;
- }
-
- public String getRemoteAddr() {
- return remoteAddr;
- }
-
- public void setRemoteAddr(String remoteAddr) {
- this.remoteAddr = remoteAddr;
- }
-
- public RemotingCommand getRequest() {
- return request;
- }
-
- public void setRequest(RemotingCommand request) {
- this.request = request;
- }
-
- public Throwable getException() {
- return exception;
- }
-
- public void setException(Throwable exception) {
- this.exception = exception;
- }
-
- public String getRemark() {
- return remark;
- }
-
- public void setRemark(String remark) {
- this.remark = remark;
- }
-}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
index 62257ef..98a04cb 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java
@@ -18,9 +18,7 @@
package org.apache.rocketmq.remoting.api.interceptor;
public interface Interceptor {
- void beforeRequest(final RequestContext context);
+ void beforeRequest(RequestContext context);
- void afterResponseReceived(final ResponseContext context);
-
- void onException(final ExceptionContext context);
+ void afterResponseReceived(ResponseContext context);
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
index 9ffc696..e7baeed 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java
@@ -21,7 +21,7 @@
import java.util.List;
public class InterceptorGroup {
- private final List<Interceptor> interceptors = new ArrayList<Interceptor>();
+ private final List<Interceptor> interceptors = new ArrayList<>();
public void registerInterceptor(final Interceptor interceptor) {
if (interceptor != null) {
@@ -40,10 +40,4 @@
interceptor.afterResponseReceived(context);
}
}
-
- public void onException(final ExceptionContext context) {
- for (Interceptor interceptor : interceptors) {
- interceptor.onException(context);
- }
- }
}
diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
index c7f7a9b..005aa28 100644
--- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
+++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java
@@ -34,30 +34,6 @@
this.response = response;
}
- public RemotingEndPoint getRemotingEndPoint() {
- return remotingEndPoint;
- }
-
- public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) {
- this.remotingEndPoint = remotingEndPoint;
- }
-
- public String getRemoteAddr() {
- return remoteAddr;
- }
-
- public void setRemoteAddr(String remoteAddr) {
- this.remoteAddr = remoteAddr;
- }
-
- public RemotingCommand getRequest() {
- return request;
- }
-
- public void setRequest(RemotingCommand request) {
- this.request = request;
- }
-
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
similarity index 67%
rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
index 92f501f..e614963 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java
@@ -23,16 +23,14 @@
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.rocketmq.remoting.api.AsyncHandler;
-import org.apache.rocketmq.remoting.api.RemotingEndPoint;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
-import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
-import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.jetbrains.annotations.Nullable;
-public class ResponseResult {
+public class ResponseFuture {
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);
- private final AtomicBoolean interceptorExecuted = new AtomicBoolean(false);
+ private final AtomicBoolean asyncHandlerExecuted = new AtomicBoolean(false);
private int requestId;
private long timeoutMillis;
@@ -47,54 +45,27 @@
private InterceptorGroup interceptorGroup;
private String remoteAddr;
- public ResponseResult(int requestId, long timeoutMillis, AsyncHandler asyncHandler, SemaphoreReleaseOnlyOnce once) {
+ public ResponseFuture(int requestId, long timeoutMillis, AsyncHandler asyncHandler, @Nullable SemaphoreReleaseOnlyOnce once) {
this.requestId = requestId;
this.timeoutMillis = timeoutMillis;
this.asyncHandler = asyncHandler;
this.once = once;
}
- public ResponseResult(int requestId, long timeoutMillis) {
+ public ResponseFuture(int requestId, long timeoutMillis) {
this.requestId = requestId;
this.timeoutMillis = timeoutMillis;
}
- public void executeRequestSendFailed() {
- if (this.interceptorExecuted.compareAndSet(false, true)) {
- try {
- interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand,
- cause, "REQUEST_SEND_FAILED"));
- } catch (Throwable e) {
- }
- //Sync call
- if (null != asyncHandler) {
- asyncHandler.onFailure(requestCommand);
- }
- }
- }
-
- public void executeCallbackArrived(final RemotingCommand response) {
- if (this.interceptorExecuted.compareAndSet(false, true)) {
- try {
- interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, this.remoteAddr,
- this.requestCommand, response));
- } catch (Throwable e) {
- }
- if (null != asyncHandler) {
- asyncHandler.onSuccess(response);
- }
- }
- }
-
- public void onTimeout(long costTimeMillis, long timoutMillis) {
- if (this.interceptorExecuted.compareAndSet(false, true)) {
- try {
- interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand,
- null, "CALLBACK_TIMEOUT"));
- } catch (Throwable ignore) {
- }
- if (null != asyncHandler) {
- asyncHandler.onTimeout(costTimeMillis, timoutMillis);
+ public void executeAsyncHandler() {
+ if (asyncHandler != null) {
+ if (this.asyncHandlerExecuted.compareAndSet(false, true)) {
+ if (cause != null) {
+ asyncHandler.onFailure(requestCommand, cause);
+ } else {
+ assert responseCommand != null;
+ asyncHandler.onSuccess(responseCommand);
+ }
}
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index cbb211e..ac989f8 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -24,7 +24,8 @@
import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -44,14 +45,13 @@
import org.apache.rocketmq.remoting.api.command.TrafficType;
import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.ResponseResult;
+import org.apache.rocketmq.remoting.common.ResponseFuture;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.config.RemotingConfig;
import org.apache.rocketmq.remoting.external.ThreadUtils;
@@ -68,7 +68,7 @@
protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
private final Semaphore semaphoreOneway;
private final Semaphore semaphoreAsync;
- private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256);
+ private final Map<Integer, ResponseFuture> ackTables = new ConcurrentHashMap<Integer, ResponseFuture>(256);
private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>();
private final RemotingCommandFactory remotingCommandFactory;
private final String remotingInstanceId = UIDGenerator.instance().createUID();
@@ -101,23 +101,23 @@
}
void scanResponseTable() {
- Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Integer, ResponseResult> next = iterator.next();
- ResponseResult result = next.getValue();
+ final List<Integer> rList = new ArrayList<>();
- if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) {
- iterator.remove();
- try {
- long timeoutMillis = result.getTimeoutMillis();
- long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp();
- result.onTimeout(timeoutMillis, costTimeMillis);
- } catch (Throwable e) {
- LOG.warn("Error occurred when execute timeout callback !", e);
- } finally {
- result.release();
- LOG.warn("Removed timeout request {} ", result);
- }
+ for (final Map.Entry<Integer, ResponseFuture> next : this.ackTables.entrySet()) {
+ ResponseFuture responseFuture = next.getValue();
+
+ if ((responseFuture.getBeginTimestamp() + responseFuture.getTimeoutMillis()) <= System.currentTimeMillis()) {
+ rList.add(responseFuture.getRequestId());
+ }
+ }
+
+ for (Integer requestID: rList) {
+ ResponseFuture rf = this.ackTables.remove(requestID);
+
+ if (rf != null) {
+ LOG.warn("remove timeout request {} ", rf);
+ rf.setCause(new RemoteTimeoutException(rf.getRemoteAddr(), rf.getTimeoutMillis()));
+ executeAsyncHandler(rf);
}
}
}
@@ -167,9 +167,6 @@
extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
- interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE,
- extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL"));
-
RemotingCommand response = remotingCommandFactory.createResponse(cmd);
response.opCode(RemotingSysResponseCode.SYSTEM_BUSY);
response.remark("SYSTEM_BUSY");
@@ -178,49 +175,23 @@
}
}
- private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
- final ResponseResult responseResult = ackTables.get(cmd.requestID());
- if (responseResult != null) {
- responseResult.setResponseCommand(cmd);
- responseResult.release();
+ private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand response) {
+ final ResponseFuture responseFuture = ackTables.remove(response.requestID());
+ if (responseFuture != null) {
+ responseFuture.setResponseCommand(response);
+ responseFuture.release();
- ackTables.remove(cmd.requestID());
+ this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
+ extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response));
- if (responseResult.getAsyncHandler() != null) {
- boolean sameThread = false;
- ExecutorService executor = this.getCallbackExecutor();
- if (executor != null) {
- try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- responseResult.executeCallbackArrived(responseResult.getResponseCommand());
- } catch (Throwable e) {
- LOG.warn("Execute callback error !", e);
- }
- }
- });
- } catch (RejectedExecutionException e) {
- sameThread = true;
- LOG.warn("Execute submit error !", e);
- }
- } else {
- sameThread = true;
- }
-
- if (sameThread) {
- try {
- responseResult.executeCallbackArrived(responseResult.getResponseCommand());
- } catch (Throwable e) {
- LOG.warn("Execute callback in response thread error !", e);
- }
- }
+ if (responseFuture.getAsyncHandler() != null) {
+ executeAsyncHandler(responseFuture);
} else {
- responseResult.putResponse(cmd);
+ responseFuture.putResponse(response);
+ responseFuture.release();
}
} else {
- LOG.warn("request {} from {} has not matched response !", cmd, extractRemoteAddress(ctx.channel()));
+ LOG.warn("request {} from {} has not matched response !", response, extractRemoteAddress(ctx.channel()));
}
}
@@ -261,6 +232,60 @@
return this.publicExecutor;
}
+ /**
+ * Execute callback in callback executor. If callback executor is null, run directly in current thread
+ */
+ private void executeAsyncHandler(final ResponseFuture responseFuture) {
+ boolean runInThisThread = false;
+ ExecutorService executor = this.getCallbackExecutor();
+ if (executor != null) {
+ try {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ responseFuture.executeAsyncHandler();
+ } catch (Throwable e) {
+ LOG.warn("execute callback in executor exception, and callback throw", e);
+ } finally {
+ responseFuture.release();
+ }
+ }
+ });
+ } catch (Throwable e) {
+ runInThisThread = true;
+ LOG.warn("execute callback in executor exception, maybe executor busy", e);
+ }
+ } else {
+ runInThisThread = true;
+ }
+
+ if (runInThisThread) {
+ try {
+ responseFuture.executeAsyncHandler();
+ } catch (Throwable e) {
+ LOG.warn("executeInvokeCallback Exception", e);
+ } finally {
+ responseFuture.release();
+ }
+ }
+ }
+
+ private void requestFail(final int requestID, final Throwable cause) {
+ ResponseFuture responseFuture = ackTables.remove(requestID);
+ if (responseFuture != null) {
+ responseFuture.setSendRequestOK(false);
+ responseFuture.putResponse(null);
+ responseFuture.setCause(cause);
+ executeAsyncHandler(responseFuture);
+ }
+ }
+
+ private void requestFail(final ResponseFuture responseFuture, final Throwable cause) {
+ responseFuture.setCause(cause);
+ executeAsyncHandler(responseFuture);
+ }
+
private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
if (response != null) {
@@ -277,8 +302,10 @@
private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) {
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
- //FiXME Exception interceptor can not throw exception
- interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
+ RemotingCommand response = remotingCommandFactory.createResponse(cmd);
+ response.opCode(RemotingSysResponseCode.SYSTEM_ERROR);
+ response.remark("SYSTEM_ERROR");
+ writeAndFlush(ctx.channel(), response);
}
}
@@ -288,7 +315,6 @@
final String remoteAddr = extractRemoteAddress(channel);
- //FIXME try catch here
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis);
@@ -302,27 +328,25 @@
private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request,
final long timeoutMillis) {
try {
- final int opaque = request.requestID();
- final ResponseResult responseResult = new ResponseResult(opaque, timeoutMillis);
- responseResult.setRequestCommand(request);
- //FIXME one interceptor for all case ?
- responseResult.setInterceptorGroup(this.interceptorGroup);
- responseResult.setRemoteAddr(remoteAddr);
+ final int requestID = request.requestID();
+ final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis);
+ responseFuture.setRequestCommand(request);
+ responseFuture.setRemoteAddr(remoteAddr);
- this.ackTables.put(opaque, responseResult);
+ this.ackTables.put(requestID, responseFuture);
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
- responseResult.setSendRequestOK(true);
+ responseFuture.setSendRequestOK(true);
return;
} else {
- responseResult.setSendRequestOK(false);
+ responseFuture.setSendRequestOK(false);
- ackTables.remove(opaque);
- responseResult.setCause(f.cause());
- responseResult.putResponse(null);
+ ackTables.remove(requestID);
+ responseFuture.setCause(f.cause());
+ responseFuture.putResponse(null);
LOG.warn("Send request command to {} failed !", remoteAddr);
}
@@ -331,14 +355,14 @@
this.writeAndFlush(channel, request, listener);
- RemotingCommand responseCommand = responseResult.waitResponse(timeoutMillis);
+ RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
- if (responseResult.isSendRequestOK()) {
- throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause());
+ if (responseFuture.isSendRequestOK()) {
+ throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
}
else {
- throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause());
+ throw new RemoteAccessException(extractRemoteAddress(channel), responseFuture.getCause());
}
}
@@ -360,98 +384,57 @@
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
- Exception exception = null;
-
- try {
- this.invokeAsync0(remoteAddr, channel, request, timeoutMillis, invokeCallback);
- } catch (InterruptedException e) {
- exception = e;
- } finally {
- if (null != exception) {
- try {
- this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
- } catch (Throwable e) {
- LOG.warn("onException ", e);
- }
- }
- }
+ this.invokeAsync0(remoteAddr, channel, request, invokeCallback, timeoutMillis);
}
private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request,
- final long timeoutMillis, final AsyncHandler invokeCallback) throws InterruptedException {
- boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ final AsyncHandler asyncHandler, final long timeoutMillis) {
+ boolean acquired = this.semaphoreAsync.tryAcquire();
if (acquired) {
final int requestID = request.requestID();
SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
- final ResponseResult responseResult = new ResponseResult(request.requestID(), timeoutMillis, invokeCallback, once);
- responseResult.setRequestCommand(request);
- responseResult.setInterceptorGroup(this.interceptorGroup);
- responseResult.setRemoteAddr(remoteAddr);
+ final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis, asyncHandler, once);
+ responseFuture.setRequestCommand(request);
+ responseFuture.setRemoteAddr(remoteAddr);
- this.ackTables.put(request.requestID(), responseResult);
+ this.ackTables.put(requestID, responseFuture);
try {
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
- public void operationComplete(ChannelFuture f) throws Exception {
- responseResult.setSendRequestOK(f.isSuccess());
+ public void operationComplete(ChannelFuture f) {
+ responseFuture.setSendRequestOK(f.isSuccess());
if (f.isSuccess()) {
return;
}
- responseResult.putResponse(null);
- ackTables.remove(requestID);
- try {
- responseResult.executeRequestSendFailed();
- } catch (Throwable e) {
- LOG.warn("Execute callback error !", e);
- } finally {
- responseResult.release();
- }
-
+ requestFail(requestID, f.cause());
LOG.warn("Send request command to channel failed.", remoteAddr);
}
};
this.writeAndFlush(channel, request, listener);
} catch (Exception e) {
- responseResult.release();
+ requestFail(requestID, e);
LOG.error("Send request command to channel " + channel + " error !", e);
}
} else {
- String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
- timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+ String info = String.format("No available async semaphore to issue the request request %s", request.toString());
+ requestFail(new ResponseFuture(request.requestID(), timeoutMillis, asyncHandler, null), new RemoteAccessException(info));
LOG.error(info);
- throw new RemoteTimeoutException(info);
}
}
- public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request, long timeoutMillis) {
+ public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request) {
request.trafficType(TrafficType.REQUEST_ONEWAY);
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request));
-
- Exception exception = null;
-
- try {
- this.invokeOneway0(channel, request, timeoutMillis);
- } catch (InterruptedException e) {
- exception = e;
- } finally {
- if (null != exception) {
- try {
- this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
- } catch (Throwable e) {
- LOG.warn("onException ", e);
- }
- }
- }
+ this.invokeOneway0(channel, request);
}
- private void invokeOneway0(final Channel channel, final RemotingCommand request,
- final long timeoutMillis) throws InterruptedException {
- boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ private void invokeOneway0(final Channel channel, final RemotingCommand request) {
+ boolean acquired = this.semaphoreOneway.tryAcquire();
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
@@ -459,7 +442,7 @@
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
- public void operationComplete(ChannelFuture f) throws Exception {
+ public void operationComplete(ChannelFuture f) {
once.release();
if (!f.isSuccess()) {
LOG.warn("Send request command to channel {} failed !", socketAddress);
@@ -473,10 +456,8 @@
LOG.error("Send request command to channel " + channel + " error !", e);
}
} else {
- String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
- timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+ String info = String.format("No available oneway semaphore to issue the request %s", request.toString());
LOG.error(info);
- throw new RemoteTimeoutException(info);
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index b9f9a64..f098de3 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -351,11 +351,6 @@
final Channel channel = this.createIfAbsent(address);
if (channel != null && channel.isActive()) {
- // We support Netty's channel-level backpressure thereby respecting slow receivers on the other side.
- if (!channel.isWritable()) {
- // Note: It's up to the layer above a transport to decide whether or not to requeue a canceled write.
- LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
- }
this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis);
} else {
this.closeChannel(address, channel);
@@ -363,15 +358,10 @@
}
@Override
- public void invokeOneWay(final String address, final RemotingCommand request, final long timeoutMillis) {
+ public void invokeOneWay(final String address, final RemotingCommand request) {
final Channel channel = this.createIfAbsent(address);
if (channel != null && channel.isActive()) {
- if (!channel.isWritable()) {
- //if (this.clientConfig.isSocketFlowControl()) {
- LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
- //throw new ServiceInvocationFailureException(String.format("Channel[%s] is not writable now", channel.toString()));
- }
- this.invokeOnewayWithInterceptor(channel, request, timeoutMillis);
+ this.invokeOnewayWithInterceptor(channel, request);
} else {
this.closeChannel(address, channel);
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index 60aca5e..40e3cb7 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -195,9 +195,8 @@
}
@Override
- public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request,
- final long timeoutMillis) {
- invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, timeoutMillis);
+ public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request) {
+ invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request);
}
private class ServerConnectionHandler extends ChannelDuplexHandler {