| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import com.google.common.collect.Sets; |
| import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; |
| import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; |
| import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.ClusterDataImpl; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.common.policies.data.TopicPolicies; |
| import org.awaitility.Awaitility; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutionException; |
| import static org.testng.AssertJUnit.assertEquals; |
| import static org.testng.AssertJUnit.assertNotNull; |
| import static org.testng.AssertJUnit.assertNull; |
| |
| @Test(groups = "broker") |
| public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest { |
| |
| private static final String NAMESPACE1 = "system-topic/namespace-1"; |
| private static final String NAMESPACE2 = "system-topic/namespace-2"; |
| private static final String NAMESPACE3 = "system-topic/namespace-3"; |
| |
| private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1"); |
| private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2"); |
| private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1"); |
| private static final TopicName TOPIC4 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-2"); |
| private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1"); |
| private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2"); |
| |
| private NamespaceEventsSystemTopicFactory systemTopicFactory; |
| private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService; |
| |
| @BeforeMethod(alwaysRun = true) |
| @Override |
| protected void setup() throws Exception { |
| conf.setSystemTopicEnabled(true); |
| conf.setTopicLevelPoliciesEnabled(true); |
| super.internalSetup(); |
| prepareData(); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| protected void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| @Test |
| public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException { |
| |
| // Init topic policies |
| TopicPolicies initPolicy = TopicPolicies.builder() |
| .maxConsumerPerTopic(10) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); |
| |
| // Wait for all topic policies updated. |
| Awaitility.await().untilAsserted(() -> |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService |
| .getPoliciesCacheInit(TOPIC1.getNamespaceObject()))); |
| |
| // Assert broker is cache all topic policies |
| Awaitility.await().untilAsserted(() -> |
| Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1) |
| .getMaxConsumerPerTopic().intValue(), 10)); |
| |
| // Update policy for TOPIC1 |
| TopicPolicies policies1 = TopicPolicies.builder() |
| .maxProducerPerTopic(1) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1).get(); |
| |
| // Update policy for TOPIC2 |
| TopicPolicies policies2 = TopicPolicies.builder() |
| .maxProducerPerTopic(2) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2).get(); |
| |
| // Update policy for TOPIC3 |
| TopicPolicies policies3 = TopicPolicies.builder() |
| .maxProducerPerTopic(3) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC3, policies3).get(); |
| |
| // Update policy for TOPIC4 |
| TopicPolicies policies4 = TopicPolicies.builder() |
| .maxProducerPerTopic(4) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC4, policies4).get(); |
| |
| // Update policy for TOPIC5 |
| TopicPolicies policies5 = TopicPolicies.builder() |
| .maxProducerPerTopic(5) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC5, policies5).get(); |
| |
| // Update policy for TOPIC6 |
| TopicPolicies policies6 = TopicPolicies.builder() |
| .maxProducerPerTopic(6) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get(); |
| |
| Awaitility.await().untilAsserted(() -> { |
| TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); |
| TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); |
| TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); |
| TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); |
| TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); |
| TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); |
| |
| Assert.assertEquals(policiesGet1, policies1); |
| Assert.assertEquals(policiesGet2, policies2); |
| Assert.assertEquals(policiesGet3, policies3); |
| Assert.assertEquals(policiesGet4, policies4); |
| Assert.assertEquals(policiesGet5, policies5); |
| Assert.assertEquals(policiesGet6, policies6); |
| }); |
| |
| // Remove reader cache will remove policies cache |
| Assert.assertEquals(systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), 6); |
| |
| // Check reader cache is correct. |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); |
| |
| policies1.setMaxProducerPerTopic(101); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); |
| policies2.setMaxProducerPerTopic(102); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); |
| policies2.setMaxProducerPerTopic(103); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); |
| policies1.setMaxProducerPerTopic(104); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); |
| policies2.setMaxProducerPerTopic(105); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); |
| policies1.setMaxProducerPerTopic(106); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); |
| |
| // reader for NAMESPACE1 will back fill the reader cache |
| Awaitility.await().untilAsserted(() -> { |
| TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); |
| TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); |
| Assert.assertEquals(policies1, policiesGet1); |
| Assert.assertEquals(policies2, policiesGet2); |
| }); |
| |
| // Check reader cache is correct. |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); |
| |
| // Check get without cache |
| TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); |
| Assert.assertEquals(policies1, policiesGet1); |
| } |
| |
| @Test |
| public void testCacheCleanup() throws Exception { |
| final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID(); |
| TopicName topicName = TopicName.get(topic); |
| admin.topics().createPartitionedTopic(topic, 3); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| Awaitility.await().untilAsserted(() |
| -> systemTopicBasedTopicPoliciesService.cacheIsInitialized(topicName)); |
| admin.topics().setMaxConsumers(topic, 1000); |
| Awaitility.await().untilAsserted(() -> |
| assertNotNull(admin.topics().getMaxConsumers(topic))); |
| Map<TopicName, TopicPolicies> map = systemTopicBasedTopicPoliciesService.getPoliciesCache(); |
| Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listMap = |
| systemTopicBasedTopicPoliciesService.getListeners(); |
| assertNotNull(map.get(topicName)); |
| assertEquals(map.get(topicName).getMaxConsumerPerTopic().intValue(), 1000); |
| assertNotNull(listMap.get(topicName).get(0)); |
| |
| admin.topics().deletePartitionedTopic(topic, true); |
| admin.namespaces().unload(NAMESPACE1); |
| assertNull(map.get(topicName)); |
| assertNull(listMap.get(topicName)); |
| } |
| |
| private void prepareData() throws PulsarAdminException { |
| admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); |
| admin.tenants().createTenant("system-topic", |
| new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test"))); |
| admin.namespaces().createNamespace(NAMESPACE1); |
| admin.namespaces().createNamespace(NAMESPACE2); |
| admin.namespaces().createNamespace(NAMESPACE3); |
| admin.lookups().lookupTopic(TOPIC1.toString()); |
| admin.lookups().lookupTopic(TOPIC2.toString()); |
| admin.lookups().lookupTopic(TOPIC3.toString()); |
| admin.lookups().lookupTopic(TOPIC4.toString()); |
| admin.lookups().lookupTopic(TOPIC5.toString()); |
| admin.lookups().lookupTopic(TOPIC6.toString()); |
| systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); |
| systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); |
| } |
| } |