Support set namespace and topic policy for non-persistent dispatcher (#10121)

### Motivation
maxConsumerPerSubscription in NonPersistentDispatcherMultipleConsumers only support broker-level policies

### Modifications
Make it support namespace and topic level policies

### Verifying this change

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index b6f4ca7..1a19186 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -88,11 +88,7 @@
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
-        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
-            return true;
-        }
-        return false;
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 023e604..c607189 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2075,6 +2075,63 @@
     }
 
     @Test(timeOut = 20000)
+    public void testNonPersistentMaxConsumerOnSub() throws Exception {
+        int maxConsumerPerSubInBroker = 1;
+        int maxConsumerPerSubInNs = 2;
+        int maxConsumerPerSubInTopic = 3;
+        conf.setMaxConsumersPerSubscription(maxConsumerPerSubInBroker);
+        final String topic = "non-persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 3);
+        Producer producer = pulsarClient.newProducer().topic(topic).create();
+
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        final String subName = "my-sub";
+        ConsumerBuilder builder = pulsarClient.newConsumer()
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subName).topic(topic);
+        Consumer consumer = builder.subscribe();
+
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException e) {
+            assertTrue(e.getMessage().contains("reached max consumers limit"));
+        }
+        // set namespace policy
+        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerPerSubInNs);
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace));
+            assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), maxConsumerPerSubInNs);
+        });
+        Consumer consumer2 = builder.subscribe();
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException e) {
+            assertTrue(e.getMessage().contains("reached max consumers limit"));
+        }
+
+        //set topic policy
+        admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerPerSubInTopic);
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic));
+            assertEquals(admin.topics().getMaxConsumersPerSubscription(topic).intValue(), maxConsumerPerSubInTopic);
+        });
+        Consumer consumer3 = builder.subscribe();
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException e) {
+            assertTrue(e.getMessage().contains("reached max consumers limit"));
+        }
+        consumer.close();
+        consumer2.close();
+        consumer3.close();
+        producer.close();
+    }
+
+    @Test(timeOut = 20000)
     public void testGetCompactionThresholdApplied() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();