[fix][broker] fix MessageDeduplication throw NPE when enable broker dedup and set namespace disable deduplication. (#20905)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index ed4e70b..490be4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -482,6 +482,10 @@
}
public void takeSnapshot() {
+ if (!isEnabled()) {
+ return;
+ }
+
Integer interval = topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
long currentTimeStamp = System.currentTimeMillis();
if (interval == null || interval <= 0
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index e57092d..16721ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -492,6 +492,43 @@
}
+ @Test(timeOut = 30000)
+ public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() throws Exception {
+ cleanup();
+ conf.setBrokerDeduplicationEnabled(true);
+ conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
+ conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
+ conf.setBrokerDeduplicationEntriesInterval(20000);
+ setup();
+
+ final String topicName = testTopic + UUID.randomUUID().toString();
+ final String producerName = "my-producer";
+ @Cleanup
+ Producer<String> producer = pulsarClient
+ .newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
+
+ // disable deduplication
+ admin.namespaces().setDeduplicationStatus(myNamespace, false);
+
+ int msgNum = 50;
+ CountDownLatch countDownLatch = new CountDownLatch(msgNum);
+ for (int i = 0; i < msgNum; i++) {
+ producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
+ }
+ countDownLatch.await();
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
+ .getTopicIfExists(topicName).get().get();
+ ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
+
+ // when disable topic deduplication the cursor should be deleted.
+ assertNull(managedCursor);
+
+ // this method will be called at brokerService forEachTopic.
+ // if topic level disable deduplication.
+ // this method should be skipped without throw exception.
+ persistentTopic.checkDeduplicationSnapshot();
+ }
+
private void waitCacheInit(String topicName) throws Exception {
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);