Support set namespace and topic policy for non-persistent dispatcher (#10121)
### Motivation
maxConsumerPerSubscription in NonPersistentDispatcherMultipleConsumers only support broker-level policies
### Modifications
Make it support namespace and topic level policies
### Verifying this change
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index b6f4ca7..1a19186 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -88,11 +88,7 @@
@Override
protected boolean isConsumersExceededOnSubscription() {
- final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
- if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
- return true;
- }
- return false;
+ return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 023e604..c607189 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2075,6 +2075,63 @@
}
@Test(timeOut = 20000)
+ public void testNonPersistentMaxConsumerOnSub() throws Exception {
+ int maxConsumerPerSubInBroker = 1;
+ int maxConsumerPerSubInNs = 2;
+ int maxConsumerPerSubInTopic = 3;
+ conf.setMaxConsumersPerSubscription(maxConsumerPerSubInBroker);
+ final String topic = "non-persistent://" + myNamespace + "/test-" + UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 3);
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+ final String subName = "my-sub";
+ ConsumerBuilder builder = pulsarClient.newConsumer()
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subName).topic(topic);
+ Consumer consumer = builder.subscribe();
+
+ try {
+ builder.subscribe();
+ fail("should fail");
+ } catch (PulsarClientException e) {
+ assertTrue(e.getMessage().contains("reached max consumers limit"));
+ }
+ // set namespace policy
+ admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerPerSubInNs);
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace));
+ assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), maxConsumerPerSubInNs);
+ });
+ Consumer consumer2 = builder.subscribe();
+ try {
+ builder.subscribe();
+ fail("should fail");
+ } catch (PulsarClientException e) {
+ assertTrue(e.getMessage().contains("reached max consumers limit"));
+ }
+
+ //set topic policy
+ admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerPerSubInTopic);
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic));
+ assertEquals(admin.topics().getMaxConsumersPerSubscription(topic).intValue(), maxConsumerPerSubInTopic);
+ });
+ Consumer consumer3 = builder.subscribe();
+ try {
+ builder.subscribe();
+ fail("should fail");
+ } catch (PulsarClientException e) {
+ assertTrue(e.getMessage().contains("reached max consumers limit"));
+ }
+ consumer.close();
+ consumer2.close();
+ consumer3.close();
+ producer.close();
+ }
+
+ @Test(timeOut = 20000)
public void testGetCompactionThresholdApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();