| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static org.mockito.Mockito.spy; |
| import static org.testng.AssertJUnit.assertEquals; |
| import static org.testng.AssertJUnit.assertNotNull; |
| import static org.testng.AssertJUnit.assertNull; |
| import static org.testng.AssertJUnit.assertTrue; |
| import java.lang.reflect.Field; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import lombok.Cleanup; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.lang3.reflect.FieldUtils; |
| import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; |
| import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; |
| import org.apache.pulsar.broker.systopic.SystemTopicClient; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.impl.Backoff; |
| import org.apache.pulsar.client.impl.BackoffBuilder; |
| import org.apache.pulsar.common.events.PulsarEvent; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.common.policies.data.TopicPolicies; |
| import org.assertj.core.api.Assertions; |
| import org.awaitility.Awaitility; |
| import org.mockito.Mockito; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| @Test(groups = "broker") |
| @Slf4j |
| public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest { |
| |
| private static final String NAMESPACE1 = "system-topic/namespace-1"; |
| private static final String NAMESPACE2 = "system-topic/namespace-2"; |
| private static final String NAMESPACE3 = "system-topic/namespace-3"; |
| |
| private static final String NAMESPACE4 = "system-topic/namespace-4"; |
| |
| private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1"); |
| private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2"); |
| private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1"); |
| private static final TopicName TOPIC4 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-2"); |
| private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1"); |
| private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2"); |
| |
| private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService; |
| |
| @BeforeMethod(alwaysRun = true) |
| @Override |
| protected void setup() throws Exception { |
| super.internalSetup(); |
| prepareData(); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| protected void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| @Test |
| public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException { |
| TopicName topicName = TopicName.get("test"); |
| class TopicPolicyListenerImpl implements TopicPolicyListener<TopicPolicies> { |
| |
| @Override |
| public void onUpdate(TopicPolicies data) { |
| //no op. |
| } |
| } |
| |
| CompletableFuture<Void> f = CompletableFuture.completedFuture(null).thenRunAsync(() -> { |
| for (int i = 0; i < 100; i++) { |
| TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl(); |
| systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); |
| Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); |
| systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener); |
| } |
| }); |
| |
| for (int i = 0; i < 100; i++) { |
| TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl(); |
| systemTopicBasedTopicPoliciesService.registerListener(topicName, listener); |
| Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName)); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1); |
| systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener); |
| } |
| |
| f.get(); |
| //Some system topics will be added to the listeners. Just check if it contains topicName. |
| Assert.assertFalse(systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName)); |
| } |
| |
| @Test |
| public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException { |
| |
| // Init topic policies |
| TopicPolicies initPolicy = TopicPolicies.builder() |
| .maxConsumerPerTopic(10) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); |
| |
| // Wait for all topic policies updated. |
| Awaitility.await().untilAsserted(() -> |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService |
| .getPoliciesCacheInit(TOPIC1.getNamespaceObject()).isDone())); |
| |
| // Assert broker is cache all topic policies |
| Awaitility.await().untilAsserted(() -> |
| Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1) |
| .getMaxConsumerPerTopic().intValue(), 10)); |
| |
| // Update policy for TOPIC1 |
| TopicPolicies policies1 = TopicPolicies.builder() |
| .maxProducerPerTopic(1) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1).get(); |
| |
| // Update policy for TOPIC2 |
| TopicPolicies policies2 = TopicPolicies.builder() |
| .maxProducerPerTopic(2) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2).get(); |
| |
| // Update policy for TOPIC3 |
| TopicPolicies policies3 = TopicPolicies.builder() |
| .maxProducerPerTopic(3) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC3, policies3).get(); |
| |
| // Update policy for TOPIC4 |
| TopicPolicies policies4 = TopicPolicies.builder() |
| .maxProducerPerTopic(4) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC4, policies4).get(); |
| |
| // Update policy for TOPIC5 |
| TopicPolicies policies5 = TopicPolicies.builder() |
| .maxProducerPerTopic(5) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC5, policies5).get(); |
| |
| // Update policy for TOPIC6 |
| TopicPolicies policies6 = TopicPolicies.builder() |
| .maxProducerPerTopic(6) |
| .build(); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get(); |
| |
| Awaitility.await().untilAsserted(() -> { |
| TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); |
| TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); |
| TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3); |
| TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4); |
| TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5); |
| TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6); |
| |
| Assert.assertEquals(policiesGet1, policies1); |
| Assert.assertEquals(policiesGet2, policies2); |
| Assert.assertEquals(policiesGet3, policies3); |
| Assert.assertEquals(policiesGet4, policies4); |
| Assert.assertEquals(policiesGet5, policies5); |
| Assert.assertEquals(policiesGet6, policies6); |
| }); |
| |
| // Remove reader cache will remove policies cache |
| Assert.assertEquals(systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), 6); |
| |
| // Check reader cache is correct. |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); |
| |
| policies1.setMaxProducerPerTopic(101); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); |
| policies2.setMaxProducerPerTopic(102); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); |
| policies2.setMaxProducerPerTopic(103); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); |
| policies1.setMaxProducerPerTopic(104); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); |
| policies2.setMaxProducerPerTopic(105); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC2, policies2); |
| policies1.setMaxProducerPerTopic(106); |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1); |
| |
| // reader for NAMESPACE1 will back fill the reader cache |
| Awaitility.await().untilAsserted(() -> { |
| TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1); |
| TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2); |
| Assert.assertEquals(policies1, policiesGet1); |
| Assert.assertEquals(policies2, policiesGet2); |
| }); |
| |
| // Check reader cache is correct. |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE1))); |
| Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3))); |
| |
| // Check get without cache |
| TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get(); |
| Assert.assertEquals(policies1, policiesGet1); |
| } |
| |
| @Test |
| public void testCacheCleanup() throws Exception { |
| final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID(); |
| TopicName topicName = TopicName.get(topic); |
| admin.topics().createPartitionedTopic(topic, 3); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| admin.topics().setMaxConsumers(topic, 1000); |
| Awaitility.await().untilAsserted(() -> |
| assertNotNull(admin.topics().getMaxConsumers(topic))); |
| Map<TopicName, TopicPolicies> map = systemTopicBasedTopicPoliciesService.getPoliciesCache(); |
| Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listMap = |
| systemTopicBasedTopicPoliciesService.getListeners(); |
| assertNotNull(map.get(topicName)); |
| assertEquals(map.get(topicName).getMaxConsumerPerTopic().intValue(), 1000); |
| assertNotNull(listMap.get(topicName).get(0)); |
| |
| admin.topics().deletePartitionedTopic(topic, true); |
| admin.namespaces().unload(NAMESPACE1); |
| assertNull(map.get(topicName)); |
| assertNull(listMap.get(topicName)); |
| } |
| |
| @Test |
| public void testListenerCleanupByPartition() throws Exception { |
| final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID(); |
| TopicName topicName = TopicName.get(topic); |
| admin.topics().createPartitionedTopic(topic, 3); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| |
| Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listMap = |
| systemTopicBasedTopicPoliciesService.getListeners(); |
| Awaitility.await().untilAsserted(() -> { |
| // all 3 topic partition have registered the topic policy listeners. |
| assertEquals(listMap.get(topicName).size(), 3); |
| }); |
| |
| admin.topics().unload(topicName.getPartition(0).toString()); |
| assertEquals(listMap.get(topicName).size(), 2); |
| admin.topics().unload(topicName.getPartition(1).toString()); |
| assertEquals(listMap.get(topicName).size(), 1); |
| admin.topics().unload(topicName.getPartition(2).toString()); |
| assertNull(listMap.get(topicName)); |
| } |
| |
| |
| |
| private void prepareData() throws PulsarAdminException { |
| admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); |
| admin.tenants().createTenant("system-topic", |
| new TenantInfoImpl(new HashSet<>(), Set.of("test"))); |
| admin.namespaces().createNamespace(NAMESPACE1); |
| admin.namespaces().createNamespace(NAMESPACE2); |
| admin.namespaces().createNamespace(NAMESPACE3); |
| admin.lookups().lookupTopic(TOPIC1.toString()); |
| admin.lookups().lookupTopic(TOPIC2.toString()); |
| admin.lookups().lookupTopic(TOPIC3.toString()); |
| admin.lookups().lookupTopic(TOPIC4.toString()); |
| admin.lookups().lookupTopic(TOPIC5.toString()); |
| admin.lookups().lookupTopic(TOPIC6.toString()); |
| systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); |
| } |
| |
| @Test |
| public void testGetPolicyTimeout() throws Exception { |
| SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); |
| Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()).isDone())); |
| service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), new CompletableFuture<>()); |
| long start = System.currentTimeMillis(); |
| Backoff backoff = new BackoffBuilder() |
| .setInitialTime(500, TimeUnit.MILLISECONDS) |
| .setMandatoryStop(5000, TimeUnit.MILLISECONDS) |
| .setMax(1000, TimeUnit.MILLISECONDS) |
| .create(); |
| try { |
| service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor(), false).get(); |
| } catch (Exception e) { |
| assertTrue(e.getCause() instanceof TopicPoliciesCacheNotInitException); |
| } |
| long cost = System.currentTimeMillis() - start; |
| assertTrue("actual:" + cost, cost >= 5000 - 1000); |
| } |
| |
| @Test |
| public void testGetTopicPoliciesWithRetry() throws Exception { |
| Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap"); |
| initMapField.setAccessible(true); |
| Map<NamespaceName, Boolean> initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService); |
| initMap.remove(NamespaceName.get(NAMESPACE1)); |
| Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches"); |
| readerCaches.setAccessible(true); |
| Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService); |
| readers.remove(NamespaceName.get(NAMESPACE1)); |
| Backoff backoff = new BackoffBuilder() |
| .setInitialTime(500, TimeUnit.MILLISECONDS) |
| .setMandatoryStop(5000, TimeUnit.MILLISECONDS) |
| .setMax(1000, TimeUnit.MILLISECONDS) |
| .create(); |
| TopicPolicies initPolicy = TopicPolicies.builder() |
| .maxConsumerPerTopic(10) |
| .build(); |
| @Cleanup("shutdownNow") |
| ScheduledExecutorService executors = Executors.newScheduledThreadPool(1); |
| executors.schedule(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get(); |
| } catch (Exception ignore) {} |
| } |
| }, 2000, TimeUnit.MILLISECONDS); |
| Awaitility.await().untilAsserted(() -> { |
| Optional<TopicPolicies> topicPolicies = systemTopicBasedTopicPoliciesService |
| .getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor(), false).get(); |
| Assert.assertTrue(topicPolicies.isPresent()); |
| if (topicPolicies.isPresent()) { |
| Assert.assertEquals(topicPolicies.get(), initPolicy); |
| } |
| }); |
| } |
| |
| @Test |
| public void testHandleNamespaceBeingDeleted() throws Exception { |
| SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); |
| pulsar.getPulsarResources().getNamespaceResources().setPolicies(NamespaceName.get(NAMESPACE1), |
| old -> { |
| old.deleted = true; |
| return old; |
| }); |
| service.deleteTopicPoliciesAsync(TOPIC1).get(); |
| } |
| |
| @Test |
| public void testGetTopicPoliciesWithCleanCache() throws Exception { |
| final String topic = "persistent://" + NAMESPACE1 + "/test" + UUID.randomUUID(); |
| pulsarClient.newProducer().topic(topic).create().close(); |
| |
| SystemTopicBasedTopicPoliciesService topicPoliciesService = |
| (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); |
| |
| ConcurrentHashMap<TopicName, TopicPolicies> spyPoliciesCache = spy(new ConcurrentHashMap<TopicName, TopicPolicies>()); |
| FieldUtils.writeDeclaredField(topicPoliciesService, "policiesCache", spyPoliciesCache, true); |
| |
| Awaitility.await().untilAsserted(() -> { |
| Assertions.assertThat(topicPoliciesService.getTopicPolicies(TopicName.get(topic))).isNull(); |
| }); |
| |
| admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1); |
| Awaitility.await().untilAsserted(() -> { |
| Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); |
| }); |
| |
| Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers = |
| (Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>) |
| FieldUtils.readDeclaredField(topicPoliciesService, "readerCaches", true); |
| |
| Mockito.doAnswer(invocation -> { |
| Thread.sleep(1000); |
| return invocation.callRealMethod(); |
| }).when(spyPoliciesCache).get(Mockito.any()); |
| |
| CompletableFuture<Void> result = new CompletableFuture<>(); |
| Thread thread = new Thread(() -> { |
| TopicPolicies topicPolicies; |
| for (int i = 0; i < 10; i++) { |
| try { |
| topicPolicies = topicPoliciesService.getTopicPolicies(TopicName.get(topic)); |
| Assert.assertNotNull(topicPolicies); |
| Thread.sleep(500); |
| } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { |
| log.warn("topic policies cache not init, retry..."); |
| } catch (Throwable e) { |
| log.error("ops: ", e); |
| result.completeExceptionally(e); |
| return; |
| } |
| } |
| result.complete(null); |
| }); |
| |
| Thread thread2 = new Thread(() -> { |
| for (int i = 0; i < 10; i++) { |
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = |
| readers.get(TopicName.get(topic).getNamespaceObject()); |
| if (readerCompletableFuture != null) { |
| readerCompletableFuture.join().closeAsync().join(); |
| } |
| } |
| }); |
| |
| thread.start(); |
| thread2.start(); |
| |
| thread.join(); |
| thread2.join(); |
| |
| result.join(); |
| } |
| |
| @Test |
| public void testWriterCache() throws Exception { |
| admin.namespaces().createNamespace(NAMESPACE4); |
| for (int i = 1; i <= 5; i ++) { |
| final String topicName = "persistent://" + NAMESPACE4 + "/testWriterCache" + i; |
| admin.topics().createNonPartitionedTopic(topicName); |
| pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close(); |
| } |
| @Cleanup("shutdown") |
| ExecutorService executorService = Executors.newFixedThreadPool(5); |
| for (int i = 1; i <= 5; i ++) { |
| int finalI = i; |
| executorService.execute(() -> { |
| final String topicName = "persistent://" + NAMESPACE4 + "/testWriterCache" + finalI; |
| try { |
| admin.topicPolicies().setMaxConsumers(topicName, 2); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| } |
| SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); |
| Assert.assertNotNull(service.getWriterCaches().synchronous().get(NamespaceName.get(NAMESPACE4))); |
| for (int i = 1; i <= 5; i ++) { |
| final String topicName = "persistent://" + NAMESPACE4 + "/testWriterCache" + i; |
| admin.topics().delete(topicName); |
| } |
| admin.namespaces().deleteNamespace(NAMESPACE4); |
| Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4))); |
| } |
| } |