[ISSUE #7410] Handle the Exception when the Proxy requests the client
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
index 40946ca..d755fdc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -34,6 +34,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
@@ -158,10 +159,15 @@
if (response.getCode() == ResponseCode.SUCCESS) {
ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo));
+ } else {
+ String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark());
+ RuntimeException e = new RuntimeException(errMsg);
+ responseFuture.completeExceptionally(e);
}
- String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark());
- RuntimeException e = new RuntimeException(errMsg);
- responseFuture.completeExceptionally(e);
+ })
+ .exceptionally(t -> {
+ responseFuture.completeExceptionally(ExceptionUtils.getRealException(t));
+ return null;
});
return CompletableFuture.completedFuture(null);
} catch (Throwable t) {
@@ -183,10 +189,15 @@
if (response.getCode() == ResponseCode.SUCCESS) {
ConsumeMessageDirectlyResult result = ConsumeMessageDirectlyResult.decode(response.getBody(), ConsumeMessageDirectlyResult.class);
responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", result));
+ } else {
+ String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark());
+ RuntimeException e = new RuntimeException(errMsg);
+ responseFuture.completeExceptionally(e);
}
- String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark());
- RuntimeException e = new RuntimeException(errMsg);
- responseFuture.completeExceptionally(e);
+ })
+ .exceptionally(t -> {
+ responseFuture.completeExceptionally(ExceptionUtils.getRealException(t));
+ return null;
});
return CompletableFuture.completedFuture(null);
} catch (Throwable t) {