[fix][broker] Fix potential exception cause the policy service init fail. (#19746)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index bbe4d26..f01ee7c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -312,20 +312,19 @@
return;
}
if (hasMore) {
- reader.readNextAsync().whenComplete((msg, e) -> {
- if (e != null) {
- log.error("[{}] Failed to read event from the system topic.",
- reader.getSystemTopic().getTopicName(), e);
- future.completeExceptionally(e);
- cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
- return;
- }
+ reader.readNextAsync().thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
}
initPolicesCache(reader, future);
+ }).exceptionally(e -> {
+ log.error("[{}] Failed to read event from the system topic.",
+ reader.getSystemTopic().getTopicName(), e);
+ future.completeExceptionally(e);
+ cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+ return null;
});
} else {
if (log.isDebugEnabled()) {