make ctx constructed in scheduleRenewTask (#8556)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 3948824..0cb5193 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -159,7 +159,8 @@
if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
return;
}
- renewalWorkerService.submit(() -> renewMessage(key, group, msgID, handleStr));
+ renewalWorkerService.submit(() -> renewMessage(createContext("RenewMessage"), key, group,
+ msgID, handleStr));
});
}
} catch (Exception e) {
@@ -169,15 +170,15 @@
log.debug("scan for renewal done. cost:{}ms", stopwatch.elapsed().toMillis());
}
- protected void renewMessage(ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) {
+ protected void renewMessage(ProxyContext context, ReceiptHandleGroupKey key, ReceiptHandleGroup group, String msgID, String handleStr) {
try {
- group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(key, messageReceiptHandle));
+ group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> startRenewMessage(context, key, messageReceiptHandle));
} catch (Exception e) {
log.error("error when renew message. msgID:{}, handleStr:{}", msgID, handleStr, e);
}
}
- protected CompletableFuture<MessageReceiptHandle> startRenewMessage(ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) {
+ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(ProxyContext context, ReceiptHandleGroupKey key, MessageReceiptHandle messageReceiptHandle) {
CompletableFuture<MessageReceiptHandle> resFuture = new CompletableFuture<>();
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
long current = System.currentTimeMillis();
@@ -209,7 +210,6 @@
}
});
} else {
- ProxyContext context = createContext("RenewMessage");
SubscriptionGroupConfig subscriptionGroupConfig =
metadataService.getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
if (subscriptionGroupConfig == null) {