[Broker] Fix NPE when subscription is already removed (#14363)
* [Broker] Fix NPE when subscription is already removed
* Cover same case for NonPersistentTopic
Master Issue: #14362
### Motivation
There is current a race condition when we remove a subscription. The race and how to reproduce it is described in the #14362. One of the consequences of the race is that there is a chance we try to remove the subscription from the topic twice. This leads to an NPE, as described in the issue.
### Modifications
* Verify that the `sub` is not null before getting its stats.
### Verifying this change
This is a trivial change.
(cherry picked from commit aee1e7dbc55099c6b7cdc49e7b5e1c4cd66994ce)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 71f1764..d242ed3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -1063,10 +1063,12 @@
// That creates deadlock. so, execute remove it in different thread.
return CompletableFuture.runAsync(() -> {
NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
- // preserve accumulative stats form removed subscription
- SubscriptionStatsImpl stats = sub.getStats();
- bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
- msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ if (sub != null) {
+ // preserve accumulative stats form removed subscription
+ SubscriptionStatsImpl stats = sub.getStats();
+ bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+ msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ }
}, brokerService.executor());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 235ea52..bc06ea4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1050,10 +1050,12 @@
void removeSubscription(String subscriptionName) {
PersistentSubscription sub = subscriptions.remove(subscriptionName);
- // preserve accumulative stats form removed subscription
- SubscriptionStatsImpl stats = sub.getStats(false, false, false);
- bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
- msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ if (sub != null) {
+ // preserve accumulative stats form removed subscription
+ SubscriptionStatsImpl stats = sub.getStats(false, false, false);
+ bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+ msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+ }
}
/**