Optimize TopicPolicies#maxConsumersPerSubscription with HierarchyTopicPolicies (#13548)

This is one of the serial topic policy optimization with HierarchyTopicPolicies.

Update topic policy with HierarchyTopicPolicies comes with these benefits:

All topic policy related settings will goes into AbstractTopic#topicPolicies, easier to understand, check, review, and modify.
Unify policy update to AbstractTopic. And easier to find which policy is not applied to non-persistent topic yet. And we can easily add support for it.
Unify policy value with 3 level settings (topic/namespace/broker), and priority with topic > namespace > broker. And it's easier to find which level settings is missing.
All values are updated at creation or by trigger. We can save some resource to update it or recalculate each time we use it.
etc.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 8970ec6..9d0043e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -43,9 +43,6 @@
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 
@@ -251,32 +248,8 @@
      */
     protected abstract boolean isConsumersExceededOnSubscription();
 
-    protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
-                                                        String topic, int consumerSize) {
-        Policies policies = null;
-        Integer maxConsumersPerSubscription = null;
-        try {
-            maxConsumersPerSubscription = brokerService
-                    .getTopicPolicies(TopicName.get(topic))
-                    .map(TopicPolicies::getMaxConsumersPerSubscription)
-                    .orElse(null);
-            if (maxConsumersPerSubscription == null) {
-                // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
-                policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                        .getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()).orElse(null);
-            }
-        } catch (Exception e) {
-            log.debug("Get topic or namespace policies fail", e);
-        }
-
-        if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = policies != null
-                    && policies.max_consumers_per_subscription != null
-                    && policies.max_consumers_per_subscription >= 0
-                    ? policies.max_consumers_per_subscription :
-                    brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
-        }
-
+    protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
+        Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
         return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 56245a6..9d26e30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -159,6 +159,7 @@
         topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
         topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
         topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
+        topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
         topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
         topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
         topicPolicies.getSubscriptionTypesEnabled().updateTopicValue(
@@ -184,6 +185,8 @@
         topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
         topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
         topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+        topicPolicies.getMaxConsumersPerSubscription()
+                .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
         topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
         topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
         topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
@@ -204,6 +207,7 @@
         topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
         topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
         topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
+        topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
         topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
         //init backlogQuota
         topicPolicies.getBackLogQuotaMap()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5688683..fae2344 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -85,7 +85,7 @@
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
+        return isConsumersExceededOnSubscription(topic, consumerList.size());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5cdbff1..ce6a30b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -73,7 +73,7 @@
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
+        return isConsumersExceededOnSubscription(topic, consumers.size());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a240c0b..0ccbc02 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -161,7 +161,7 @@
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
+        return isConsumersExceededOnSubscription(topic, consumerList.size());
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index d05076a..043f64d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -130,7 +130,7 @@
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
+        return isConsumersExceededOnSubscription(topic, consumers.size());
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index 92dd3aa..1492fe0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -18,34 +18,6 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelPromise;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.impl.EntryImpl;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.EntryBatchSizes;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.RedeliveryTracker;
-import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.protocol.Commands;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.IObjectFactory;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.ObjectFactory;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
@@ -61,6 +33,33 @@
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelPromise;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
+import org.apache.pulsar.common.protocol.Commands;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
 
 @PrepareForTest({ DispatchRateLimiter.class })
 @PowerMockIgnore({"org.apache.logging.log4j.*"})
@@ -95,9 +94,13 @@
         brokerMock = mock(BrokerService.class);
         doReturn(pulsarMock).when(brokerMock).pulsar();
 
+        HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
+        topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);
+
         topicMock = mock(NonPersistentTopic.class);
         doReturn(brokerMock).when(topicMock).getBrokerService();
         doReturn(topicName).when(topicMock).getName();
+        doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
 
         subscriptionMock = mock(NonPersistentSubscription.class);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 3cb5bfb..9d085ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -18,9 +18,38 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anySet;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
@@ -37,6 +66,7 @@
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.mockito.ArgumentCaptor;
@@ -49,37 +79,6 @@
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyList;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.anySet;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
 @PrepareForTest({ DispatchRateLimiter.class })
 @PowerMockIgnore({"org.apache.logging.log4j.*"})
 @Test(groups = "broker")
@@ -118,9 +117,13 @@
         brokerMock = mock(BrokerService.class);
         doReturn(pulsarMock).when(brokerMock).pulsar();
 
+        HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
+        topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);
+
         topicMock = mock(PersistentTopic.class);
         doReturn(brokerMock).when(topicMock).getBrokerService();
         doReturn(topicName).when(topicMock).getName();
+        doReturn(topicPolicies).when(topicMock).getHierarchyTopicPolicies();
 
         cursorMock = mock(ManagedCursorImpl.class);
         doReturn(null).when(cursorMock).getLastIndividualDeletedRange();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index fcdaeb7..662c191 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -42,6 +42,7 @@
     final PolicyHierarchyValue<Integer> topicMaxMessageSize;
     final PolicyHierarchyValue<Integer> messageTTLInSeconds;
     final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
+    final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
 
     public HierarchyTopicPolicies() {
         replicationClusters = new PolicyHierarchyValue<>();
@@ -51,6 +52,7 @@
         maxSubscriptionsPerTopic = new PolicyHierarchyValue<>();
         maxProducersPerTopic = new PolicyHierarchyValue<>();
         maxConsumerPerTopic = new PolicyHierarchyValue<>();
+        maxConsumersPerSubscription = new PolicyHierarchyValue<>();
         backLogQuotaMap = new ImmutableMap.Builder<BacklogQuotaType, PolicyHierarchyValue<BacklogQuota>>()
                 .put(BacklogQuotaType.destination_storage, new PolicyHierarchyValue<>())
                 .put(BacklogQuotaType.message_age, new PolicyHierarchyValue<>())