Move duplicate code to abstract parent class (#10061)

* Move duplicate code to abstract parent class

* code style
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 02305e4..d57fc7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -19,20 +19,26 @@
 
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import io.netty.buffer.ByteBuf;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 
@@ -148,6 +154,39 @@
         sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
     }
 
+    /**
+     * Determine whether the number of consumers on the subscription reaches the threshold.
+     * @return
+     */
+    protected abstract boolean isConsumersExceededOnSubscription();
+
+    protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
+                                                        String topic, int consumerSize) {
+        Policies policies = null;
+        Integer maxConsumersPerSubscription = null;
+        try {
+            maxConsumersPerSubscription = Optional.ofNullable(brokerService
+                    .getTopicPolicies(TopicName.get(topic)))
+                    .map(TopicPolicies::getMaxConsumersPerSubscription)
+                    .orElse(null);
+            if (maxConsumersPerSubscription == null) {
+                // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
+                policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+                        .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
+            }
+        } catch (Exception e) {
+            log.debug("Get topic or namespace policies fail", e);
+        }
+
+        if (maxConsumersPerSubscription == null) {
+            maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0
+                    ? policies.max_consumers_per_subscription :
+                    brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
+        }
+
+        return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
+    }
+
     private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
         // Remove the protobuf headers
         Commands.skipMessageMetadata(headersAndPayload);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 9eb5a2e..b142c51 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -70,8 +70,6 @@
 
     protected abstract void cancelPendingRead();
 
-    protected abstract boolean isConsumersExceededOnSubscription();
-
     protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
         if (null != activeConsumer && subscriptionType == SubType.Failover) {
             consumers.forEach(consumer ->
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 00584cf..b6f4ca7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -86,7 +86,8 @@
         consumerSet.add(consumer);
     }
 
-    private boolean isConsumersExceededOnSubscription() {
+    @Override
+    protected boolean isConsumersExceededOnSubscription() {
         final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
         if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
             return true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index cecdbaf..a720767 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -18,13 +18,10 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import java.util.List;
-import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -33,9 +30,6 @@
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 
@@ -79,37 +73,9 @@
         }
     }
 
+    @Override
     protected boolean isConsumersExceededOnSubscription() {
-        Policies policies = null;
-        Integer maxConsumersPerSubscription = null;
-        try {
-            maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
-                    .getTopicPolicies(TopicName.get(topicName)))
-                    .map(TopicPolicies::getMaxConsumersPerSubscription)
-                    .orElse(null);
-            if (maxConsumersPerSubscription == null) {
-                // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
-                policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            }
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-
-        if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
-                    ? policies.max_consumers_per_subscription :
-                    serviceConfig.getMaxConsumersPerSubscription();
-        }
-
-        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
-            return true;
-        }
-        return false;
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index eec0f93..2c05644 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
@@ -40,7 +39,6 @@
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -59,10 +57,8 @@
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
 import org.apache.pulsar.common.util.collections.LongPairSet;
@@ -158,37 +154,9 @@
         consumerSet.add(consumer);
     }
 
-    private boolean isConsumersExceededOnSubscription() {
-        Policies policies = null;
-        Integer maxConsumersPerSubscription = null;
-        try {
-            maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
-                    .getTopicPolicies(TopicName.get(topic.getName())))
-                    .map(TopicPolicies::getMaxConsumersPerSubscription)
-                    .orElse(null);
-
-            if (maxConsumersPerSubscription == null) {
-                // Use getDataIfPresent from zk cache to make the call non-blocking and
-                // prevent deadlocks in addConsumer
-                policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            }
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-
-        if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
-                    ? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription();
-        }
-
-        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
-            return true;
-        }
-        return false;
+    @Override
+    protected boolean isConsumersExceededOnSubscription() {
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6e28a3c..93c2d25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import java.util.Iterator;
 import java.util.List;
@@ -36,7 +35,6 @@
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -50,10 +48,8 @@
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,36 +128,9 @@
         }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
     }
 
+    @Override
     protected boolean isConsumersExceededOnSubscription() {
-        Policies policies = null;
-        Integer maxConsumersPerSubscription = null;
-        try {
-            maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
-                    .getTopicPolicies(TopicName.get(topicName)))
-                    .map(TopicPolicies::getMaxConsumersPerSubscription)
-                    .orElse(null);
-            if (maxConsumersPerSubscription == null) {
-                // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
-                policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
-
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            }
-        } catch (Exception e) {
-            policies = new Policies();
-        }
-
-        if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
-                    ? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription();
-        }
-
-        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
-            return true;
-        }
-        return false;
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
     }
 
     @Override