| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.admin; |
| |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import lombok.Cleanup; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.mledger.ManagedLedgerConfig; |
| import org.apache.pulsar.broker.ConfigHelper; |
| import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; |
| import org.apache.pulsar.broker.service.BacklogQuotaManager; |
| import org.apache.pulsar.broker.service.PublishRateLimiterImpl; |
| import org.apache.pulsar.broker.service.Topic; |
| import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.ConsumerBuilder; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.api.SubscriptionInitialPosition; |
| import org.apache.pulsar.client.api.SubscriptionMode; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.DispatchRate; |
| import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; |
| import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; |
| import org.apache.pulsar.common.policies.data.PersistencePolicies; |
| import org.apache.pulsar.common.policies.data.PublishRate; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.SubscribeRate; |
| import org.apache.pulsar.common.policies.data.TenantInfo; |
| import org.awaitility.Awaitility; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import java.lang.reflect.Field; |
| import java.lang.reflect.Method; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| |
| @Slf4j |
| @Test(groups = "broker") |
| public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { |
| |
| private final String testTenant = "my-tenant"; |
| |
| private final String testNamespace = "my-namespace"; |
| |
| private final String myNamespace = testTenant + "/" + testNamespace; |
| |
| private final String testTopic = "persistent://" + myNamespace + "/test-set-backlog-quota"; |
| |
| private final String persistenceTopic = "persistent://" + myNamespace + "/test-set-persistence"; |
| |
| @BeforeMethod |
| @Override |
| protected void setup() throws Exception { |
| this.conf.setSystemTopicEnabled(true); |
| this.conf.setTopicLevelPoliciesEnabled(true); |
| this.conf.setDefaultNumberOfNamespaceBundles(1); |
| super.internalSetup(); |
| |
| admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant(this.testTenant, tenantInfo); |
| admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test")); |
| admin.topics().createPartitionedTopic(testTopic, 2); |
| Producer producer = pulsarClient.newProducer().topic(testTopic).create(); |
| producer.close(); |
| waitForZooKeeperWatchers(); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| public void cleanup() throws Exception { |
| super.internalCleanup(); |
| this.resetConfig(); |
| } |
| |
| @Test |
| public void testSetBacklogQuota() throws Exception { |
| |
| BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setBacklogQuota(testTopic, backlogQuota); |
| log.info("Backlog quota set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) |
| .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); |
| |
| BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); |
| BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); |
| log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic); |
| Assert.assertEquals(backlogQuota, backlogQuotaInManager); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemoveBacklogQuota() throws Exception { |
| BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setBacklogQuota(testTopic, backlogQuota); |
| log.info("Backlog quota set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) |
| .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); |
| |
| BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); |
| BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); |
| log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic); |
| Assert.assertEquals(backlogQuota, backlogQuotaInManager); |
| |
| admin.topics().removeBacklogQuota(testTopic); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic) |
| .get(BacklogQuota.BacklogQuotaType.destination_storage))); |
| |
| backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); |
| log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager, |
| testTopic); |
| Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testCheckBacklogQuota() throws Exception { |
| RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10); |
| String namespace = TopicName.get(testTopic).getNamespace(); |
| admin.namespaces().setRetention(namespace, retentionPolicies); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies)); |
| |
| BacklogQuota backlogQuota = |
| new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); |
| try { |
| admin.topics().setBacklogQuota(testTopic, backlogQuota); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| } |
| |
| backlogQuota = |
| new BacklogQuota(10 * 1024 * 1024 + 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); |
| try { |
| admin.topics().setBacklogQuota(testTopic, backlogQuota); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| } |
| |
| backlogQuota = |
| new BacklogQuota(10 * 1024 * 1024 - 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); |
| admin.topics().setBacklogQuota(testTopic, backlogQuota); |
| |
| BacklogQuota finalBacklogQuota = backlogQuota; |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) |
| .get(BacklogQuota.BacklogQuotaType.destination_storage), finalBacklogQuota)); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testGetBacklogQuotaApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap()); |
| assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap()); |
| Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf); |
| assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap); |
| BacklogQuota namespaceQuota = new BacklogQuota(30L, BacklogQuota.RetentionPolicy.producer_exception); |
| |
| admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota); |
| Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); |
| Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = Maps.newHashMap(); |
| namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, namespaceQuota); |
| assertEquals(admin.topics().getBacklogQuotaMap(topic, true), namespaceQuotaMap); |
| |
| BacklogQuota topicQuota = new BacklogQuota(40L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| admin.topics().setBacklogQuota(topic, topicQuota); |
| Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty())); |
| Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = Maps.newHashMap(); |
| topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, topicQuota); |
| assertEquals(admin.topics().getBacklogQuotaMap(topic, true), topicQuotaMap); |
| |
| admin.namespaces().removeBacklogQuota(myNamespace); |
| admin.topics().removeBacklogQuota(topic); |
| Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); |
| Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty())); |
| assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap); |
| } |
| |
| @Test |
| public void testCheckBacklogQuotaFailed() throws Exception { |
| RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10); |
| String namespace = TopicName.get(testTopic).getNamespace(); |
| admin.namespaces().setRetention(namespace, retentionPolicies); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies)); |
| |
| BacklogQuota backlogQuota = |
| new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| try { |
| admin.topics().setBacklogQuota(testTopic, backlogQuota); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| } |
| //Ensure that the cache has not been updated after a long time |
| Awaitility.await().atLeast(1, TimeUnit.SECONDS); |
| assertNull(admin.topics().getBacklogQuotaMap(testTopic) |
| .get(BacklogQuota.BacklogQuotaType.destination_storage)); |
| } |
| |
| @Test |
| public void testCheckRetention() throws Exception { |
| BacklogQuota backlogQuota = |
| new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setBacklogQuota(testTopic, backlogQuota); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) |
| .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); |
| |
| RetentionPolicies retention = new RetentionPolicies(10, 10); |
| log.info("Retention: {} will set to the topic: {}", retention, testTopic); |
| try { |
| admin.topics().setRetention(testTopic, retention); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| } |
| |
| retention = new RetentionPolicies(10, 9); |
| log.info("Retention: {} will set to the topic: {}", retention, testTopic); |
| try { |
| admin.topics().setRetention(testTopic, retention); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| } |
| |
| retention = new RetentionPolicies(10, 12); |
| log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); |
| admin.topics().setRetention(testTopic, retention); |
| |
| RetentionPolicies finalRetention = retention; |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), finalRetention)); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testSetRetention() throws Exception { |
| RetentionPolicies retention = new RetentionPolicies(60, 1024); |
| log.info("Retention: {} will set to the topic: {}", retention, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setRetention(testTopic, retention); |
| log.info("Retention set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), retention)); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemoveRetention() throws Exception { |
| |
| RetentionPolicies retention = new RetentionPolicies(60, 1024); |
| log.info("Retention: {} will set to the topic: {}", retention, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setRetention(testTopic, retention); |
| log.info("Retention set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), retention)); |
| |
| admin.topics().removeRetention(testTopic); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getRetention(testTopic))); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test(timeOut = 10000) |
| public void testRetentionAppliedApi() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| RetentionPolicies brokerPolicies = |
| new RetentionPolicies(conf.getDefaultRetentionTimeInMinutes(), conf.getDefaultRetentionSizeInMB()); |
| assertEquals(admin.topics().getRetention(topic, true), brokerPolicies); |
| |
| RetentionPolicies namespacePolicies = new RetentionPolicies(10, 20); |
| admin.namespaces().setRetention(myNamespace, namespacePolicies); |
| Awaitility.await().untilAsserted(() |
| -> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies)); |
| |
| RetentionPolicies topicPolicies = new RetentionPolicies(20,30); |
| admin.topics().setRetention(topic, topicPolicies); |
| Awaitility.await().untilAsserted(() |
| -> assertEquals(admin.topics().getRetention(topic, true), topicPolicies)); |
| |
| admin.topics().removeRetention(topic); |
| Awaitility.await().untilAsserted(() |
| -> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies)); |
| |
| admin.namespaces().removeRetention(myNamespace); |
| Awaitility.await().untilAsserted(() |
| -> assertEquals(admin.topics().getRetention(topic, true), brokerPolicies)); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testGetSubDispatchRateApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getSubscriptionDispatchRate(topic)); |
| assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)); |
| DispatchRate brokerDispatchRate = new DispatchRate( |
| pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), |
| pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(), |
| 1 |
| ); |
| assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate); |
| DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12); |
| |
| admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace))); |
| assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), namespaceDispatchRate); |
| |
| DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22); |
| admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionDispatchRate(topic))); |
| assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), topicDispatchRate); |
| |
| admin.namespaces().removeSubscriptionDispatchRate(myNamespace); |
| admin.topics().removeSubscriptionDispatchRate(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscriptionDispatchRate(topic))); |
| assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testRetentionPriority() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getRetention(topic)); |
| assertNull(admin.namespaces().getRetention(myNamespace)); |
| |
| PersistentTopic persistentTopic = |
| (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); |
| Method shouldTopicBeRetained = PersistentTopic.class.getDeclaredMethod("shouldTopicBeRetained"); |
| shouldTopicBeRetained.setAccessible(true); |
| Field lastActive = PersistentTopic.class.getSuperclass().getDeclaredField("lastActive"); |
| lastActive.setAccessible(true); |
| //set last active to 2 minutes ago |
| lastActive.setLong(persistentTopic, System.nanoTime() - TimeUnit.MINUTES.toNanos(2)); |
| //the default value of the broker-level is 0, so it is not retained by default |
| assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| //set namespace-level policy |
| RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1); |
| admin.namespaces().setRetention(myNamespace, retentionPolicies); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNotNull(admin.namespaces().getRetention(myNamespace))); |
| assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| // set topic-level policy |
| admin.topics().setRetention(topic, new RetentionPolicies(3, 1)); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNotNull(admin.topics().getRetention(topic))); |
| assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| //topic-level disabled |
| admin.topics().setRetention(topic, new RetentionPolicies(0, 0)); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertEquals(admin.topics().getRetention(topic).getRetentionSizeInMB(), 0)); |
| assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| // remove topic-level policy |
| admin.topics().removeRetention(topic); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNull(admin.topics().getRetention(topic))); |
| assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| //namespace-level disabled |
| admin.namespaces().setRetention(myNamespace, new RetentionPolicies(0, 0)); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNotNull(admin.namespaces().getRetention(myNamespace))); |
| assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| //change namespace-level policy |
| admin.namespaces().setRetention(myNamespace, new RetentionPolicies(1, 1)); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNotNull(admin.namespaces().getRetention(myNamespace))); |
| assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| // remove namespace-level policy |
| admin.namespaces().removeRetention(myNamespace); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNull(admin.namespaces().getRetention(myNamespace))); |
| //the default value of the broker-level is 0, so it is not retained by default |
| assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic)); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testGetPersistenceApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getPersistence(topic)); |
| assertNull(admin.namespaces().getPersistence(myNamespace)); |
| PersistencePolicies brokerPolicy |
| = new PersistencePolicies(pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(), |
| pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(), |
| pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(), |
| pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit()); |
| assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy); |
| PersistencePolicies namespacePolicy |
| = new PersistencePolicies(5,4,3,2); |
| |
| admin.namespaces().setPersistence(myNamespace, namespacePolicy); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getPersistence(myNamespace))); |
| assertEquals(admin.topics().getPersistence(topic, true), namespacePolicy); |
| |
| PersistencePolicies topicPolicy = new PersistencePolicies(4, 3, 2, 1); |
| admin.topics().setPersistence(topic, topicPolicy); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getPersistence(topic))); |
| assertEquals(admin.topics().getPersistence(topic, true), topicPolicy); |
| |
| admin.namespaces().removePersistence(myNamespace); |
| admin.topics().removePersistence(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getPersistence(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getPersistence(topic))); |
| assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy); |
| } |
| |
| @Test |
| public void testCheckPersistence() throws Exception { |
| PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0); |
| log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic); |
| try { |
| admin.topics().setPersistence(testTopic, persistencePolicies); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 400); |
| } |
| |
| persistencePolicies = new PersistencePolicies(2, 6, 2, 0.0); |
| log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic); |
| try { |
| admin.topics().setPersistence(testTopic, persistencePolicies); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 400); |
| } |
| |
| persistencePolicies = new PersistencePolicies(2, 2, 6, 0.0); |
| log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic); |
| try { |
| admin.topics().setPersistence(testTopic, persistencePolicies); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 400); |
| } |
| |
| persistencePolicies = new PersistencePolicies(1, 2, 2, 0.0); |
| log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic); |
| try { |
| admin.topics().setPersistence(testTopic, persistencePolicies); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 400); |
| } |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testSetPersistence() throws Exception { |
| PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3); |
| admin.namespaces().setPersistence(myNamespace, persistencePoliciesForNamespace); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPersistence(myNamespace), persistencePoliciesForNamespace)); |
| |
| PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1); |
| log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setPersistence(persistenceTopic, persistencePolicies); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getPersistence(persistenceTopic), persistencePolicies)); |
| |
| admin.lookups().lookupTopic(persistenceTopic); |
| Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get(); |
| PersistentTopic persistentTopic = (PersistentTopic) t; |
| ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig(); |
| assertEquals(managedLedgerConfig.getEnsembleSize(), 3); |
| assertEquals(managedLedgerConfig.getWriteQuorumSize(), 3); |
| assertEquals(managedLedgerConfig.getAckQuorumSize(), 3); |
| assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.1); |
| |
| PersistencePolicies getPersistencePolicies = admin.topics().getPersistence(persistenceTopic); |
| log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic); |
| Assert.assertEquals(getPersistencePolicies, persistencePolicies); |
| } |
| |
| @Test |
| public void testGetDispatchRateApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getDispatchRate(topic)); |
| assertNull(admin.namespaces().getDispatchRate(myNamespace)); |
| DispatchRate brokerDispatchRate = new DispatchRate( |
| pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(), |
| pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(), |
| 1 |
| ); |
| assertEquals(admin.topics().getDispatchRate(topic, true), brokerDispatchRate); |
| DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12); |
| |
| admin.namespaces().setDispatchRate(myNamespace, namespaceDispatchRate); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getDispatchRate(myNamespace))); |
| assertEquals(admin.topics().getDispatchRate(topic, true), namespaceDispatchRate); |
| |
| DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22); |
| admin.topics().setDispatchRate(topic, topicDispatchRate); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getDispatchRate(topic))); |
| assertEquals(admin.topics().getDispatchRate(topic, true), topicDispatchRate); |
| |
| admin.namespaces().removeDispatchRate(myNamespace); |
| admin.topics().removeDispatchRate(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDispatchRate(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getDispatchRate(topic))); |
| assertEquals(admin.topics().getDispatchRate(topic, true), brokerDispatchRate); |
| } |
| |
| @Test |
| public void testRemovePersistence() throws Exception { |
| PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3); |
| admin.namespaces().setPersistence(myNamespace, persistencePoliciesForNamespace); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPersistence(myNamespace), persistencePoliciesForNamespace)); |
| |
| PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1); |
| log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setPersistence(persistenceTopic, persistencePolicies); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getPersistence(persistenceTopic), persistencePolicies)); |
| |
| admin.topics().removePersistence(persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getPersistence(persistenceTopic))); |
| |
| admin.lookups().lookupTopic(persistenceTopic); |
| Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get(); |
| PersistentTopic persistentTopic = (PersistentTopic) t; |
| ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig(); |
| assertEquals(managedLedgerConfig.getEnsembleSize(), 2); |
| assertEquals(managedLedgerConfig.getWriteQuorumSize(), 2); |
| assertEquals(managedLedgerConfig.getAckQuorumSize(), 2); |
| assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.3); |
| } |
| |
| @Test |
| public void testCheckMaxProducers() throws Exception { |
| int maxProducers = -1; |
| log.info("MaxProducers: {} will set to the topic: {}", maxProducers, testTopic); |
| try { |
| admin.topics().setMaxProducers(testTopic, maxProducers); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| } |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testGetMaxProducerApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getMaxProducers(topic)); |
| assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace)); |
| assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic()); |
| |
| admin.namespaces().setMaxProducersPerTopic(myNamespace, 7); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getMaxProducersPerTopic(myNamespace))); |
| assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), 7); |
| |
| admin.topics().setMaxProducers(topic, 1000); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxProducers(topic))); |
| assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), 1000); |
| |
| admin.namespaces().removeMaxProducersPerTopic(myNamespace); |
| admin.topics().removeMaxProducers(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getMaxProducers(topic))); |
| assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic()); |
| } |
| |
| @Test |
| public void testSetMaxProducers() throws Exception { |
| Integer maxProducers = 2; |
| log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setMaxProducers(persistenceTopic, maxProducers); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers)); |
| |
| admin.topics().createPartitionedTopic(persistenceTopic, 2); |
| Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Producer<byte[]> producer3 = null; |
| |
| try { |
| producer3 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Topic reached max producers limit on topic level."); |
| } |
| Assert.assertNotNull(producer1); |
| Assert.assertNotNull(producer2); |
| Assert.assertNull(producer3); |
| |
| admin.topics().deletePartitionedTopic(persistenceTopic, true); |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemoveMaxProducers() throws Exception { |
| Integer maxProducers = 2; |
| log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setMaxProducers(persistenceTopic, maxProducers); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers)); |
| |
| admin.topics().createPartitionedTopic(persistenceTopic, 2); |
| Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Producer<byte[]> producer3 = null; |
| Producer<byte[]> producer4 = null; |
| |
| try { |
| producer3 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Topic reached max producers limit on topic level."); |
| } |
| Assert.assertNotNull(producer1); |
| Assert.assertNotNull(producer2); |
| Assert.assertNull(producer3); |
| |
| admin.topics().removeMaxProducers(persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getMaxProducers(persistenceTopic))); |
| |
| producer3 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Assert.assertNotNull(producer3); |
| admin.namespaces().setMaxProducersPerTopic(myNamespace, 3); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace).intValue(), 3)); |
| |
| log.info("MaxProducers: {} will set to the namespace: {}", 3, myNamespace); |
| try { |
| producer4 = pulsarClient.newProducer().topic(persistenceTopic).create(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Topic reached max producers limit on namespace level."); |
| } |
| Assert.assertNull(producer4); |
| |
| producer1.close(); |
| producer2.close(); |
| producer3.close(); |
| admin.topics().deletePartitionedTopic(persistenceTopic, true); |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| |
| @Test |
| public void testGetSetDispatchRate() throws Exception { |
| DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true); |
| log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setDispatchRate(testTopic, dispatchRate); |
| log.info("Dispatch Rate set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getDispatchRate(testTopic), dispatchRate)); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemoveDispatchRate() throws Exception { |
| DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true); |
| log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setDispatchRate(testTopic, dispatchRate); |
| log.info("Dispatch Rate set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getDispatchRate(testTopic), dispatchRate)); |
| |
| admin.topics().removeDispatchRate(testTopic); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getDispatchRate(testTopic))); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testPolicyOverwrittenByNamespaceLevel() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| admin.topics().createNonPartitionedTopic(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| DispatchRate dispatchRate = new DispatchRate(200, 20000, 1, true); |
| admin.namespaces().setDispatchRate(myNamespace, dispatchRate); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getDispatchRate(myNamespace).dispatchThrottlingRateInMsg, 200)); |
| |
| dispatchRate = new DispatchRate(100, 10000, 1, true); |
| admin.topics().setDispatchRate(topic, dispatchRate); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNotNull(admin.topics().getDispatchRate(topic))); |
| |
| //1 Set ns level policy, topic level should not be overwritten |
| dispatchRate = new DispatchRate(300, 30000, 2, true); |
| admin.namespaces().setDispatchRate(myNamespace, dispatchRate); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> { |
| DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get(); |
| Assert.assertEquals(limiter.getDispatchRateOnByte(), 10000); |
| Assert.assertEquals(limiter.getDispatchRateOnMsg(), 100); |
| }); |
| |
| admin.topics().removeDispatchRate(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getDispatchRate(topic))); |
| |
| //2 Remove level policy ,DispatchRateLimiter should us ns level policy |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> { |
| DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get(); |
| Assert.assertEquals(limiter.getDispatchRateOnByte(), 30000); |
| Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300); |
| }); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testRestart() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| admin.topics().createNonPartitionedTopic(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| InactiveTopicPolicies inactiveTopicPolicies = |
| new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true); |
| admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getInactiveTopicPolicies(myNamespace).getInactiveTopicDeleteMode(), |
| InactiveTopicDeleteMode.delete_when_subscriptions_caught_up)); |
| |
| inactiveTopicPolicies = |
| new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false); |
| admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic))); |
| |
| // restart broker, policy should still take effect |
| restartBroker(); |
| |
| // Trigger the cache init. |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| |
| InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies; |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .untilAsserted(() -> { |
| PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); |
| Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), finalInactiveTopicPolicies); |
| }); |
| |
| producer.close(); |
| } |
| |
| @Test |
| public void testGetSetSubscriptionDispatchRate() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| admin.topics().createNonPartitionedTopic(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| DispatchRate dispatchRate = new DispatchRate(1000, |
| 1024 * 1024, 1); |
| log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic); |
| |
| admin.topics().setSubscriptionDispatchRate(topic, dispatchRate); |
| log.info("Subscription dispatch rate set success on topic: {}", topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), dispatchRate)); |
| |
| String subscriptionName = "test_subscription_rate"; |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe(); |
| |
| DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) |
| .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); |
| Assert.assertNotNull(dispatchRateLimiter); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg); |
| |
| consumer.close(); |
| admin.topics().delete(topic, true); |
| } |
| |
| @Test |
| public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| admin.topics().createNonPartitionedTopic(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| DispatchRate dispatchRate = new DispatchRate(1000, |
| 1024 * 1024, 1); |
| log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic); |
| |
| String subscriptionName = "test_subscription_rate"; |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe(); |
| |
| admin.topics().setSubscriptionDispatchRate(topic, dispatchRate); |
| log.info("Subscription dispatch rate set success on topic: {}", topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), dispatchRate)); |
| |
| DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) |
| .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); |
| Assert.assertNotNull(dispatchRateLimiter); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg); |
| |
| consumer.close(); |
| admin.topics().delete(topic, true); |
| } |
| |
| @Test |
| public void testRemoveSubscriptionDispatchRate() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| admin.topics().createNonPartitionedTopic(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| DispatchRate dispatchRate = new DispatchRate(1000, |
| 1024 * 1024, 1); |
| log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic); |
| |
| admin.topics().setSubscriptionDispatchRate(topic, dispatchRate); |
| log.info("Subscription dispatch rate set success on topic: {}", topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), dispatchRate)); |
| |
| String subscriptionName = "test_subscription_rate"; |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe(); |
| |
| DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) |
| .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); |
| Assert.assertNotNull(dispatchRateLimiter); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg); |
| |
| // remove subscription dispatch rate |
| admin.topics().removeSubscriptionDispatchRate(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getSubscriptionDispatchRate(topic))); |
| |
| dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) |
| .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); |
| Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg); |
| Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte); |
| |
| consumer.close(); |
| admin.topics().delete(topic, true); |
| } |
| |
| @Test |
| public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| admin.topics().createNonPartitionedTopic(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| // set namespace level subscription dispatch rate |
| DispatchRate namespaceDispatchRate = new DispatchRate(100, 1024 * 1024, 1); |
| admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(myNamespace), namespaceDispatchRate)); |
| |
| String subscriptionName = "test_subscription_rate"; |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe(); |
| |
| // get subscription dispatch Rate limiter |
| DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic) |
| .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte); |
| |
| // set topic level subscription dispatch rate |
| DispatchRate topicDispatchRate = new DispatchRate(200, 2 * 1024 * 1024, 1); |
| admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), topicDispatchRate)); |
| |
| // get subscription dispatch rate limiter |
| dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get() |
| .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), topicDispatchRate.dispatchThrottlingRateInByte); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), topicDispatchRate.dispatchThrottlingRateInMsg); |
| |
| // remove topic level subscription dispatch rate limiter |
| admin.topics().removeSubscriptionDispatchRate(topic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getSubscriptionDispatchRate(topic))); |
| |
| // get subscription dispatch rate limiter |
| dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get() |
| .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get(); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte); |
| Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg); |
| |
| consumer.close(); |
| admin.topics().delete(topic, true); |
| } |
| |
| @Test |
| public void testGetSetCompactionThreshold() throws Exception { |
| Long compactionThreshold = 100000L; |
| log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setCompactionThreshold(testTopic, compactionThreshold); |
| log.info("Compaction threshold set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getCompactionThreshold(testTopic), compactionThreshold)); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemoveCompactionThreshold() throws Exception { |
| Long compactionThreshold = 100000L; |
| log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setCompactionThreshold(testTopic, compactionThreshold); |
| log.info("Compaction threshold set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getCompactionThreshold(testTopic), compactionThreshold)); |
| |
| admin.topics().removeCompactionThreshold(testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getCompactionThreshold(testTopic))); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testGetSetMaxConsumersPerSubscription() throws Exception { |
| Integer maxConsumersPerSubscription = 10; |
| log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription); |
| log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumersPerSubscription(testTopic), maxConsumersPerSubscription)); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemoveMaxConsumersPerSubscription() throws Exception { |
| Integer maxConsumersPerSubscription = 10; |
| log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription); |
| log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumersPerSubscription(testTopic), maxConsumersPerSubscription)); |
| |
| admin.topics().removeMaxConsumersPerSubscription(testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getMaxConsumersPerSubscription(testTopic))); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testGetSetPublishRate() throws Exception { |
| PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5); |
| log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setPublishRate(testTopic, publishRate); |
| log.info("Publish Rate set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getPublishRate(testTopic), publishRate)); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemovePublishRate() throws Exception { |
| PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5); |
| log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setPublishRate(testTopic, publishRate); |
| log.info("Publish Rate set success on topic: {}", testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getPublishRate(testTopic), publishRate)); |
| |
| admin.topics().removePublishRate(testTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getPublishRate(testTopic))); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testCheckMaxConsumers() throws Exception { |
| Integer maxProducers = new Integer(-1); |
| log.info("MaxConsumers: {} will set to the topic: {}", maxProducers, testTopic); |
| try { |
| admin.topics().setMaxConsumers(testTopic, maxProducers); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| } |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| @Test |
| public void testGetMaxConsumersApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getMaxConsumers(topic)); |
| assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)); |
| assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), conf.getMaxConsumersPerTopic()); |
| |
| admin.namespaces().setMaxConsumersPerTopic(myNamespace, 7); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace))); |
| assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), 7); |
| |
| admin.topics().setMaxConsumers(topic, 1000); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxConsumers(topic))); |
| assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), 1000); |
| |
| admin.namespaces().removeMaxConsumersPerTopic(myNamespace); |
| admin.topics().removeMaxConsumers(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getMaxConsumers(topic))); |
| assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), conf.getMaxConsumersPerTopic()); |
| } |
| |
| @Test |
| public void testSetMaxConsumers() throws Exception { |
| admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxConsumersPerTopic(myNamespace).intValue(), 1)); |
| log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace); |
| Integer maxConsumers = 2; |
| log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setMaxConsumers(persistenceTopic, maxConsumers); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers)); |
| |
| admin.topics().createPartitionedTopic(persistenceTopic, 2); |
| Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe(); |
| Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe(); |
| Consumer<byte[]> consumer3 = null; |
| |
| try { |
| consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Topic reached max consumers limit"); |
| } |
| Assert.assertNotNull(consumer1); |
| Assert.assertNotNull(consumer2); |
| Assert.assertNull(consumer3); |
| consumer1.close(); |
| consumer2.close(); |
| |
| admin.topics().deletePartitionedTopic(persistenceTopic, true); |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testRemoveMaxConsumers() throws Exception { |
| Integer maxConsumers = 2; |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| admin.topics().setMaxConsumers(persistenceTopic, maxConsumers); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers)); |
| |
| admin.topics().createPartitionedTopic(persistenceTopic, 2); |
| Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe(); |
| Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe(); |
| Consumer<byte[]> consumer3 = null; |
| |
| try { |
| consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Topic reached max consumers limit"); |
| } |
| Assert.assertNotNull(consumer1); |
| Assert.assertNotNull(consumer2); |
| Assert.assertNull(consumer3); |
| |
| admin.topics().removeMaxConsumers(persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getMaxConsumers(persistenceTopic))); |
| |
| consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe(); |
| Assert.assertNotNull(consumer3); |
| admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3); |
| log.info("MaxConsumers: {} will set to the namespace: {}", 3, myNamespace); |
| |
| Consumer<byte[]> consumer4 = null; |
| try { |
| consumer4 = pulsarClient.newConsumer().subscriptionName("sub4").topic(persistenceTopic).subscribe(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Topic reached max consumers limit on namespace level."); |
| } |
| Assert.assertNull(consumer4); |
| |
| consumer1.close(); |
| consumer2.close(); |
| consumer3.close(); |
| admin.topics().deletePartitionedTopic(persistenceTopic, true); |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testGetSetSubscribeRate() throws Exception { |
| admin.topics().createPartitionedTopic(persistenceTopic, 2); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| SubscribeRate subscribeRate1 = new SubscribeRate(1, 30); |
| log.info("Subscribe Rate: {} will be set to the namespace: {}", subscribeRate1, myNamespace); |
| admin.namespaces().setSubscribeRate(myNamespace, subscribeRate1); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getSubscribeRate(myNamespace), subscribeRate1)); |
| |
| SubscribeRate subscribeRate2 = new SubscribeRate(2, 30); |
| log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate2, persistenceTopic); |
| admin.topics().setSubscribeRate(persistenceTopic, subscribeRate2); |
| log.info("Subscribe Rate set success on topic: {}", persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscribeRate(persistenceTopic), subscribeRate2)); |
| |
| PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0); |
| PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0); |
| PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0); |
| |
| Consumer consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer1); |
| consumer1.close(); |
| pulsarClient1.shutdown(); |
| |
| Consumer consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer2); |
| consumer2.close(); |
| pulsarClient2.shutdown(); |
| |
| Consumer consumer3 = null; |
| |
| try { |
| consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("subscribe rate reached max subscribe rate limit"); |
| } |
| |
| Assert.assertNull(consumer3); |
| pulsarClient3.shutdown(); |
| |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| admin.topics().deletePartitionedTopic(persistenceTopic, true); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testGetSubscribeRateApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getSubscribeRate(topic)); |
| assertNull(admin.namespaces().getSubscribeRate(myNamespace)); |
| SubscribeRate brokerPolicy = new SubscribeRate( |
| pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(), |
| pulsar.getConfiguration().getSubscribeRatePeriodPerConsumerInSecond() |
| ); |
| assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy); |
| SubscribeRate namespacePolicy = new SubscribeRate(10, 11); |
| |
| admin.namespaces().setSubscribeRate(myNamespace, namespacePolicy); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscribeRate(myNamespace))); |
| assertEquals(admin.topics().getSubscribeRate(topic, true), namespacePolicy); |
| |
| SubscribeRate topicPolicy = new SubscribeRate(20, 21); |
| admin.topics().setSubscribeRate(topic, topicPolicy); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscribeRate(topic))); |
| assertEquals(admin.topics().getSubscribeRate(topic, true), topicPolicy); |
| |
| admin.namespaces().removeSubscribeRate(myNamespace); |
| admin.topics().removeSubscribeRate(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscribeRate(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscribeRate(topic))); |
| assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testPriorityAndDisableMaxConsumersOnSub() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| int maxConsumerInBroker = 1; |
| int maxConsumerInNs = 2; |
| int maxConsumerInTopic = 4; |
| String mySub = "my-sub"; |
| conf.setMaxConsumersPerSubscription(maxConsumerInBroker); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().until(() -> |
| pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| List<Consumer<String>> consumerList = new ArrayList<>(); |
| ConsumerBuilder<String> builder = pulsarClient.newConsumer(Schema.STRING) |
| .subscriptionType(SubscriptionType.Shared) |
| .topic(topic).subscriptionName(mySub); |
| consumerList.add(builder.subscribe()); |
| try { |
| builder.subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException ignored) { |
| } |
| |
| admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerInNs); |
| Awaitility.await().untilAsserted(() -> |
| assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace))); |
| consumerList.add(builder.subscribe()); |
| try { |
| builder.subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException ignored) { |
| } |
| //disabled |
| admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), 0)); |
| consumerList.add(builder.subscribe()); |
| //set topic-level |
| admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerInTopic); |
| Awaitility.await().untilAsserted(() -> |
| assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic))); |
| consumerList.add(builder.subscribe()); |
| try { |
| builder.subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException ignored) { |
| } |
| //remove topic policies |
| admin.topics().removeMaxConsumersPerSubscription(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin.topics().getMaxConsumersPerSubscription(topic))); |
| consumerList.add(builder.subscribe()); |
| //remove namespace policies, then use broker-level |
| admin.namespaces().removeMaxConsumersPerSubscription(myNamespace); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace))); |
| try { |
| builder.subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException ignored) { |
| } |
| |
| for (Consumer<String> consumer : consumerList) { |
| consumer.close(); |
| } |
| } |
| |
| @Test |
| public void testRemoveSubscribeRate() throws Exception { |
| admin.topics().createPartitionedTopic(persistenceTopic, 2); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic))); |
| |
| SubscribeRate subscribeRate = new SubscribeRate(2, 30); |
| log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic); |
| admin.topics().setSubscribeRate(persistenceTopic, subscribeRate); |
| log.info("Subscribe Rate set success on topic: {}", persistenceTopic); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscribeRate(persistenceTopic), subscribeRate)); |
| |
| PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0); |
| PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0); |
| PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0); |
| |
| Consumer<byte[]> consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer1); |
| consumer1.close(); |
| pulsarClient1.shutdown(); |
| |
| Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer2); |
| consumer2.close(); |
| pulsarClient2.shutdown(); |
| |
| Consumer<byte[]> consumer3 = null; |
| |
| try { |
| consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("subscribe rate reached max subscribe rate limit"); |
| } |
| Assert.assertNull(consumer3); |
| |
| admin.topics().removeSubscribeRate(persistenceTopic); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .untilAsserted(() -> Assert.assertNull(admin.topics().getSubscribeRate(persistenceTopic))); |
| |
| admin.topics().unload(persistenceTopic); |
| |
| PulsarClient pulsarClient4 = newPulsarClient(lookupUrl.toString(), 0); |
| PulsarClient pulsarClient5 = newPulsarClient(lookupUrl.toString(), 0); |
| PulsarClient pulsarClient6 = newPulsarClient(lookupUrl.toString(), 0); |
| |
| consumer3 = pulsarClient3.newConsumer().subscriptionName("sub2") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer3); |
| consumer3.close(); |
| pulsarClient3.shutdown(); |
| |
| Consumer<byte[]> consumer4 = pulsarClient4.newConsumer().subscriptionName("sub2") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer4); |
| consumer4.close(); |
| pulsarClient4.shutdown(); |
| |
| Consumer<byte[]> consumer5 = pulsarClient5.newConsumer().subscriptionName("sub2") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer5); |
| consumer5.close(); |
| pulsarClient5.shutdown(); |
| |
| Consumer<byte[]> consumer6 = pulsarClient6.newConsumer().subscriptionName("sub2") |
| .topic(persistenceTopic).consumerName("test").subscribe(); |
| Assert.assertNotNull(consumer6); |
| consumer6.close(); |
| pulsarClient6.shutdown(); |
| |
| admin.topics().deletePartitionedTopic(persistenceTopic, true); |
| admin.topics().deletePartitionedTopic(testTopic, true); |
| } |
| |
| @Test |
| public void testPublishRateInDifferentLevelPolicy() throws Exception { |
| cleanup(); |
| conf.setMaxPublishRatePerTopicInMessages(5); |
| conf.setMaxPublishRatePerTopicInBytes(50L); |
| setup(); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); |
| |
| final String topicName = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topicName).create().close(); |
| Field publishMaxMessageRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxMessageRate"); |
| publishMaxMessageRate.setAccessible(true); |
| Field publishMaxByteRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxByteRate"); |
| publishMaxByteRate.setAccessible(true); |
| |
| //1 use broker-level policy by default |
| PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); |
| PublishRateLimiterImpl publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter(); |
| Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5); |
| Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L); |
| |
| //2 set namespace-level policy |
| PublishRate publishMsgRate = new PublishRate(10, 100L); |
| admin.namespaces().setPublishRate(myNamespace, publishMsgRate); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> { |
| PublishRateLimiterImpl limiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter(); |
| return (int)publishMaxMessageRate.get(limiter) == 10; |
| }); |
| |
| publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter(); |
| Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 10); |
| Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 100L); |
| |
| //3 set topic-level policy, namespace-level policy should be overwritten |
| PublishRate publishMsgRate2 = new PublishRate(11, 101L); |
| admin.topics().setPublishRate(topicName, publishMsgRate2); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> admin.topics().getPublishRate(topicName) != null); |
| |
| publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter(); |
| Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 11); |
| Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 101L); |
| |
| //4 remove topic-level policy, namespace-level policy will take effect |
| admin.topics().removePublishRate(topicName); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> admin.topics().getPublishRate(topicName) == null); |
| |
| publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter(); |
| Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 10); |
| Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 100L); |
| |
| //5 remove namespace-level policy, broker-level policy will take effect |
| admin.namespaces().removePublishRate(myNamespace); |
| |
| Awaitility.await().atMost(3, TimeUnit.SECONDS) |
| .until(() -> { |
| PublishRateLimiterImpl limiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter(); |
| return (int)publishMaxMessageRate.get(limiter) == 5; |
| }); |
| |
| publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter(); |
| Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5); |
| Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testTopicMaxMessageSizeApi() throws Exception{ |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic))); |
| assertNull(admin.topics().getMaxMessageSize(persistenceTopic)); |
| |
| admin.topics().setMaxMessageSize(persistenceTopic,10); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() |
| -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null); |
| assertEquals(admin.topics().getMaxMessageSize(persistenceTopic).intValue(),10); |
| |
| admin.topics().removeMaxMessageSize(persistenceTopic); |
| assertNull(admin.topics().getMaxMessageSize(persistenceTopic)); |
| |
| try { |
| admin.topics().setMaxMessageSize(persistenceTopic,Integer.MAX_VALUE); |
| fail("should fail"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(),412); |
| } |
| try { |
| admin.topics().setMaxMessageSize(persistenceTopic, -1); |
| fail("should fail"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(),412); |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| public void testTopicMaxMessageSize() throws Exception{ |
| doTestTopicMaxMessageSize(true); |
| doTestTopicMaxMessageSize(false); |
| } |
| |
| private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception { |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| if (isPartitioned) { |
| admin.topics().createPartitionedTopic(topic, 3); |
| } |
| // init cache |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getMaxMessageSize(topic)); |
| // set msg size |
| admin.topics().setMaxMessageSize(topic, 10); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() |
| -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); |
| assertEquals(admin.topics().getMaxMessageSize(topic).intValue(), 10); |
| |
| try { |
| producer.send(new byte[1024]); |
| } catch (PulsarClientException e) { |
| assertTrue(e instanceof PulsarClientException.NotAllowedException); |
| } |
| |
| admin.topics().removeMaxMessageSize(topic); |
| assertNull(admin.topics().getMaxMessageSize(topic)); |
| |
| try { |
| admin.topics().setMaxMessageSize(topic, Integer.MAX_VALUE); |
| fail("should fail"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), 412); |
| } |
| try { |
| admin.topics().setMaxMessageSize(topic, -1); |
| fail("should fail"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), 412); |
| } |
| |
| MessageId messageId = producer.send(new byte[1024]); |
| assertNotNull(messageId); |
| producer.close(); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testMaxSubscriptionsFailFast() throws Exception { |
| doTestMaxSubscriptionsFailFast(SubscriptionMode.Durable); |
| doTestMaxSubscriptionsFailFast(SubscriptionMode.NonDurable); |
| } |
| |
| private void doTestMaxSubscriptionsFailFast(SubscriptionMode subMode) throws Exception { |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| // init cache |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| int maxSubInNamespace = 2; |
| List<Consumer> consumers = new ArrayList<>(); |
| ConsumerBuilder consumerBuilder = pulsarClient.newConsumer().subscriptionMode(subMode) |
| .subscriptionType(SubscriptionType.Shared).topic(topic); |
| admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSubInNamespace); |
| Awaitility.await().untilAsserted(() |
| -> assertNotNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace))); |
| for (int i = 0; i < maxSubInNamespace; i++) { |
| consumers.add(consumerBuilder.subscriptionName("sub" + i).subscribe()); |
| } |
| long start = System.currentTimeMillis(); |
| try { |
| consumerBuilder.subscriptionName("sub").subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException e) { |
| assertTrue(e instanceof PulsarClientException.NotAllowedException); |
| } |
| //fail fast |
| assertTrue(System.currentTimeMillis() - start < 3000); |
| //clean |
| for (Consumer consumer : consumers) { |
| consumer.close(); |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| public void testMaxSubscriptionsPerTopicApi() throws Exception { |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| // init cache |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic)); |
| // set max subscriptions |
| admin.topics().setMaxSubscriptionsPerTopic(topic, 10); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() |
| -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); |
| assertEquals(admin.topics().getMaxSubscriptionsPerTopic(topic).intValue(), 10); |
| // remove max subscriptions |
| admin.topics().removeMaxSubscriptionsPerTopic(topic); |
| assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic)); |
| // set invalidate value |
| try { |
| admin.topics().setMaxMessageSize(topic, -1); |
| fail("should fail"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), 412); |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| public void testMaxSubscriptionsPerTopicWithExistingSubs() throws Exception { |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| // init cache |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| // Set topic-level max subscriptions |
| final int topicLevelMaxSubNum = 2; |
| admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() |
| -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); |
| List<Consumer<String>> consumerList = new ArrayList<>(); |
| String subName = "my-sub-"; |
| for (int i = 0; i < topicLevelMaxSubNum; i++) { |
| Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) |
| .subscriptionType(SubscriptionType.Shared) |
| .subscriptionName(subName + i) |
| .topic(topic).subscribe(); |
| consumerList.add(consumer); |
| } |
| // should fail |
| try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS) |
| .serviceUrl(brokerUrl.toString()).build()) { |
| consumerList.add(client.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()) |
| .topic(topic).subscribe()); |
| fail("should fail"); |
| } catch (PulsarClientException ignore) { |
| assertEquals(consumerList.size(), topicLevelMaxSubNum); |
| } |
| //create a consumer with the same subscription name, it should succeed |
| pulsarClient.newConsumer(Schema.STRING) |
| .subscriptionType(SubscriptionType.Shared) |
| .subscriptionName(subName + "0") |
| .topic(topic).subscribe().close(); |
| |
| //Clean up |
| for (Consumer<String> c : consumerList) { |
| c.close(); |
| } |
| } |
| |
| @Test |
| public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception { |
| cleanup(); |
| conf.setMaxUnackedMessagesPerSubscription(30); |
| setup(); |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| // init cache |
| @Cleanup |
| Producer producer = pulsarClient.newProducer().topic(topic).create(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| //default value is null |
| assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace)); |
| int msgNum = 100; |
| int maxUnackedMsgOnTopic = 10; |
| int maxUnackedMsgNumOnNamespace = 5; |
| int defaultMaxUnackedMsgOnBroker = pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription(); |
| produceMsg(producer, msgNum); |
| //set namespace-level policy, the restriction should take effect |
| admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, maxUnackedMsgNumOnNamespace); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() |
| -> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue(), maxUnackedMsgNumOnNamespace)); |
| List<Message<?>> messages; |
| String subName = "sub-" + UUID.randomUUID(); |
| ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topic) |
| .subscriptionName(subName).receiverQueueSize(1) |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .subscriptionType(SubscriptionType.Shared); |
| @Cleanup |
| Consumer<byte[]> consumer1 = consumerBuilder.subscribe(); |
| messages = getMsgReceived(consumer1, msgNum); |
| assertEquals(messages.size(), maxUnackedMsgNumOnNamespace); |
| ackMessages(consumer1, messages); |
| //disable namespace-level policy, should unlimited |
| admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, 0); |
| Awaitility.await().untilAsserted(() |
| -> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue(), 0)); |
| messages = getMsgReceived(consumer1, 40); |
| assertEquals(messages.size(), 40); |
| ackMessages(consumer1, messages); |
| |
| //set topic-level and namespace-level policy, topic-level should has higher priority |
| admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, maxUnackedMsgNumOnNamespace); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() |
| -> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue() |
| , maxUnackedMsgNumOnNamespace)); |
| admin.topics().setMaxUnackedMessagesOnSubscription(topic, maxUnackedMsgOnTopic); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topic))); |
| //check the value applied |
| PersistentTopic persistentTopic = (PersistentTopic)pulsar.getBrokerService().getTopicIfExists(topic).get().get(); |
| assertEquals(persistentTopic.getMaxUnackedMessagesOnSubscription(), maxUnackedMsgOnTopic); |
| messages = getMsgReceived(consumer1, Integer.MAX_VALUE); |
| assertEquals(messages.size(), maxUnackedMsgOnTopic); |
| ackMessages(consumer1, messages); |
| |
| //remove both namespace-level and topic-level policy, broker-level should take effect |
| admin.namespaces().removeMaxUnackedMessagesPerSubscription(myNamespace); |
| admin.topics().removeMaxUnackedMessagesOnSubscription(topic); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() |
| -> admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace) == null |
| && admin.topics().getMaxUnackedMessagesOnSubscription(topic) == null); |
| messages = getMsgReceived(consumer1, Integer.MAX_VALUE); |
| assertEquals(messages.size(), defaultMaxUnackedMsgOnBroker); |
| } |
| |
| private void produceMsg(Producer producer, int msgNum) throws Exception{ |
| for (int i = 0; i < msgNum; i++) { |
| producer.send("msg".getBytes()); |
| } |
| } |
| |
| private List<Message<?>> getMsgReceived(Consumer<byte[]> consumer1, int msgNum) throws PulsarClientException { |
| List<Message<?>> messages = new ArrayList<>(); |
| for (int i = 0; i < msgNum; i++) { |
| Message<?> message = consumer1.receive(1000, TimeUnit.MILLISECONDS); |
| if (message == null) { |
| break; |
| } |
| messages.add(message); |
| } |
| return messages; |
| } |
| |
| private void ackMessages(Consumer<?> consumer, List<Message<?>> messages) throws Exception{ |
| for (Message<?> message : messages) { |
| consumer.acknowledge(message); |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| public void testMaxSubscriptionsPerTopic() throws Exception { |
| int brokerLevelMaxSub = 4; |
| conf.setMaxSubscriptionsPerTopic(4); |
| restartBroker(); |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| // init cache |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| // Set topic-level max subscriptions |
| final int topicLevelMaxSubNum = 2; |
| admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() |
| -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); |
| |
| List<Consumer<String>> consumerList = new ArrayList<>(); |
| for (int i = 0; i < topicLevelMaxSubNum; i++) { |
| Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()) |
| .topic(topic).subscribe(); |
| consumerList.add(consumer); |
| } |
| try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS) |
| .serviceUrl(brokerUrl.toString()).build()) { |
| consumerList.add(client.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()) |
| .topic(topic).subscribe()); |
| fail("should fail"); |
| } catch (PulsarClientException ignore) { |
| assertEquals(consumerList.size(), topicLevelMaxSubNum); |
| } |
| // Set namespace-level policy, but will not take effect |
| final int namespaceLevelMaxSub = 3; |
| admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, namespaceLevelMaxSub); |
| PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); |
| Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); |
| field.setAccessible(true); |
| Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == namespaceLevelMaxSub); |
| |
| try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS) |
| .serviceUrl(brokerUrl.toString()).build()) { |
| client.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()) |
| .topic(topic).subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException ignore) { |
| assertEquals(consumerList.size(), topicLevelMaxSubNum); |
| } |
| //Removed topic-level policy, namespace-level should take effect |
| admin.topics().removeMaxSubscriptionsPerTopic(topic); |
| consumerList.add(pulsarClient.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe()); |
| assertEquals(consumerList.size(), namespaceLevelMaxSub); |
| try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS) |
| .serviceUrl(brokerUrl.toString()).build()) { |
| consumerList.add(client.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()) |
| .topic(topic).subscribe()); |
| fail("should fail"); |
| } catch (PulsarClientException ignore) { |
| assertEquals(consumerList.size(), namespaceLevelMaxSub); |
| } |
| //Removed namespace-level policy, broker-level should take effect |
| admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); |
| Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null); |
| consumerList.add(pulsarClient.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe()); |
| assertEquals(consumerList.size(), brokerLevelMaxSub); |
| try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS) |
| .serviceUrl(brokerUrl.toString()).build()) { |
| consumerList.add(client.newConsumer(Schema.STRING) |
| .subscriptionName(UUID.randomUUID().toString()) |
| .topic(topic).subscribe()); |
| fail("should fail"); |
| } catch (PulsarClientException ignore) { |
| assertEquals(consumerList.size(), brokerLevelMaxSub); |
| } |
| //Clean up |
| for (Consumer<String> c : consumerList) { |
| c.close(); |
| } |
| } |
| |
| @Test(timeOut = 30000) |
| public void testReplicatorRateApi() throws Exception { |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| // init cache |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| |
| assertNull(admin.topics().getReplicatorDispatchRate(topic)); |
| |
| DispatchRate dispatchRate = new DispatchRate(100,200L,10); |
| admin.topics().setReplicatorDispatchRate(topic, dispatchRate); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() |
| -> assertEquals(admin.topics().getReplicatorDispatchRate(topic), dispatchRate)); |
| |
| admin.topics().removeReplicatorDispatchRate(topic); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNull(admin.topics().getReplicatorDispatchRate(topic))); |
| } |
| |
| @Test(timeOut = 20000) |
| public void testGetReplicatorRateApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getReplicatorDispatchRate(topic)); |
| assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)); |
| DispatchRate brokerDispatchRate = new DispatchRate( |
| pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInMsg(), |
| pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInByte(), |
| 1 |
| ); |
| assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), brokerDispatchRate); |
| DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12); |
| |
| admin.namespaces().setReplicatorDispatchRate(myNamespace, namespaceDispatchRate); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getReplicatorDispatchRate(myNamespace))); |
| assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), namespaceDispatchRate); |
| |
| DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22); |
| admin.topics().setReplicatorDispatchRate(topic, topicDispatchRate); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getReplicatorDispatchRate(topic))); |
| assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), topicDispatchRate); |
| |
| admin.namespaces().removeReplicatorDispatchRate(myNamespace); |
| admin.topics().removeReplicatorDispatchRate(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getReplicatorDispatchRate(topic))); |
| assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), brokerDispatchRate); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testAutoCreationDisabled() throws Exception { |
| cleanup(); |
| conf.setAllowAutoTopicCreation(false); |
| setup(); |
| final String topic = testTopic + UUID.randomUUID(); |
| admin.topics().createPartitionedTopic(topic, 3); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| //should not fail |
| assertNull(admin.topics().getMessageTTL(topic)); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testSubscriptionTypesEnabled() throws Exception { |
| final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID(); |
| admin.topics().createNonPartitionedTopic(topic); |
| |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| // use broker.conf |
| pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close(); |
| assertNull(admin.topics().getSubscriptionTypesEnabled(topic)); |
| // set enable failover sub type |
| Set<SubscriptionType> subscriptionTypeSet = new HashSet<>(); |
| subscriptionTypeSet.add(SubscriptionType.Failover); |
| admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet); |
| |
| Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() |
| -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null); |
| subscriptionTypeSet = admin.topics().getSubscriptionTypesEnabled(topic); |
| assertTrue(subscriptionTypeSet.contains(SubscriptionType.Failover)); |
| assertFalse(subscriptionTypeSet.contains(SubscriptionType.Shared)); |
| assertEquals(subscriptionTypeSet.size(), 1); |
| try { |
| pulsarClient.newConsumer().topic(topic) |
| .subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe(); |
| fail(); |
| } catch (PulsarClientException pulsarClientException) { |
| assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException); |
| } |
| |
| // add shared type |
| subscriptionTypeSet.add(SubscriptionType.Shared); |
| admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet); |
| pulsarClient.newConsumer().topic(topic) |
| .subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close(); |
| |
| // test namespace and topic policy |
| subscriptionTypeSet.add(SubscriptionType.Shared); |
| admin.namespaces().setSubscriptionTypesEnabled(myNamespace, subscriptionTypeSet); |
| |
| subscriptionTypeSet.clear(); |
| subscriptionTypeSet.add(SubscriptionType.Failover); |
| admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet); |
| |
| try { |
| pulsarClient.newConsumer().topic(topic) |
| .subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe(); |
| fail(); |
| } catch (PulsarClientException pulsarClientException) { |
| assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException); |
| } |
| } |
| |
| @Test(timeOut = 20000) |
| public void testGetCompactionThresholdApplied() throws Exception { |
| final String topic = testTopic + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().atMost(5, TimeUnit.SECONDS) |
| .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); |
| assertNull(admin.topics().getCompactionThreshold(topic)); |
| assertNull(admin.namespaces().getCompactionThreshold(myNamespace)); |
| long brokerPolicy = pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes(); |
| assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy); |
| long namespacePolicy = 10L; |
| |
| admin.namespaces().setCompactionThreshold(myNamespace, namespacePolicy); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getCompactionThreshold(myNamespace))); |
| assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), namespacePolicy); |
| |
| long topicPolicy = 20L; |
| admin.topics().setCompactionThreshold(topic, topicPolicy); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getCompactionThreshold(topic))); |
| assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), topicPolicy); |
| |
| admin.namespaces().removeCompactionThreshold(myNamespace); |
| admin.topics().removeCompactionThreshold(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getCompactionThreshold(myNamespace))); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getCompactionThreshold(topic))); |
| assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy); |
| } |
| |
| } |