[fix][broker] fix MessageDeduplication throw NPE when enable broker dedup and set namespace disable deduplication. (#20905)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index ed4e70b..490be4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -482,6 +482,10 @@
     }
 
     public void takeSnapshot() {
+        if (!isEnabled()) {
+            return;
+        }
+
         Integer interval = topic.getHierarchyTopicPolicies().getDeduplicationSnapshotIntervalSeconds().get();
         long currentTimeStamp = System.currentTimeMillis();
         if (interval == null || interval <= 0
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index e57092d..16721ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -492,6 +492,43 @@
 
     }
 
+    @Test(timeOut = 30000)
+    public void testDisableNamespacePolicyTakeSnapshotShouldNotThrowException() throws Exception {
+        cleanup();
+        conf.setBrokerDeduplicationEnabled(true);
+        conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
+        conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
+        conf.setBrokerDeduplicationEntriesInterval(20000);
+        setup();
+
+        final String topicName = testTopic + UUID.randomUUID().toString();
+        final String producerName = "my-producer";
+        @Cleanup
+        Producer<String> producer = pulsarClient
+                .newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
+
+        // disable deduplication
+        admin.namespaces().setDeduplicationStatus(myNamespace, false);
+
+        int msgNum = 50;
+        CountDownLatch countDownLatch = new CountDownLatch(msgNum);
+        for (int i = 0; i < msgNum; i++) {
+            producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
+        }
+        countDownLatch.await();
+        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopicIfExists(topicName).get().get();
+        ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
+
+        // when disable topic deduplication the cursor should be deleted.
+        assertNull(managedCursor);
+
+        // this method will be called at brokerService forEachTopic.
+        // if topic level disable deduplication.
+        // this method should be skipped without throw exception.
+        persistentTopic.checkDeduplicationSnapshot();
+    }
+
     private void waitCacheInit(String topicName) throws Exception {
         pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
         TopicName topic = TopicName.get(topicName);