| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| import com.google.common.collect.Sets; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.UUID; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.DispatchRate; |
| import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; |
| import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; |
| import org.apache.pulsar.common.policies.data.PersistencePolicies; |
| import org.apache.pulsar.common.policies.data.PublishRate; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.SubscribeRate; |
| import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; |
| import org.awaitility.Awaitility; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| /** |
| * Starts 3 brokers that are in 3 different clusters |
| */ |
| @Test(groups = "broker") |
| public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase { |
| |
| @Override |
| @BeforeClass(alwaysRun = true, timeOut = 300000) |
| public void setup() throws Exception { |
| config1.setSystemTopicEnabled(true); |
| config1.setDefaultNumberOfNamespaceBundles(1); |
| config1.setTopicLevelPoliciesEnabled(true); |
| config2.setSystemTopicEnabled(true); |
| config2.setTopicLevelPoliciesEnabled(true); |
| config2.setDefaultNumberOfNamespaceBundles(1); |
| config3.setSystemTopicEnabled(true); |
| config3.setTopicLevelPoliciesEnabled(true); |
| config3.setDefaultNumberOfNamespaceBundles(1); |
| super.setup(); |
| } |
| |
| @Override |
| @AfterClass(alwaysRun = true, timeOut = 300000) |
| public void cleanup() throws Exception { |
| super.cleanup(); |
| } |
| |
| @Test |
| public void testReplicateQuotaTopicPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set BacklogQuota |
| BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl(); |
| backlogQuota.setLimitSize(1); |
| backlogQuota.setLimitTime(2); |
| admin1.topicPolicies(true).setBacklogQuota(topic, backlogQuota); |
| |
| Awaitility.await().untilAsserted(() -> |
| assertTrue(admin2.topicPolicies(true).getBacklogQuotaMap(topic).containsValue(backlogQuota))); |
| Awaitility.await().untilAsserted(() -> |
| assertTrue(admin3.topicPolicies(true).getBacklogQuotaMap(topic).containsValue(backlogQuota))); |
| //remove BacklogQuota |
| admin1.topicPolicies(true).removeBacklogQuota(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getBacklogQuotaMap(topic).size(), 0)); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getBacklogQuotaMap(topic).size(), 0)); |
| } |
| |
| @Test |
| public void testReplicateMessageTTLPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set message ttl |
| admin1.topicPolicies(true).setMessageTTL(topic, 10); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getMessageTTL(topic).intValue(), 10)); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getMessageTTL(topic).intValue(), 10)); |
| //remove message ttl |
| admin1.topicPolicies(true).removeMessageTTL(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getMessageTTL(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getMessageTTL(topic))); |
| } |
| |
| @Test |
| public void testReplicateSubscribeRatePolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set global topic policy |
| SubscribeRate subscribeRate = new SubscribeRate(100, 10000); |
| admin1.topicPolicies(true).setSubscribeRate(topic, subscribeRate); |
| |
| // get global topic policy |
| untilRemoteClustersAsserted( |
| admin -> assertEquals(admin.topicPolicies(true).getSubscribeRate(topic), subscribeRate)); |
| |
| // remove global topic policy |
| admin1.topicPolicies(true).removeSubscribeRate(topic); |
| untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getSubscribeRate(topic))); |
| } |
| |
| @Test |
| public void testReplicatePublishRatePolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set global topic policy |
| PublishRate publishRate = new PublishRate(100, 10000); |
| admin1.topicPolicies(true).setPublishRate(topic, publishRate); |
| |
| // get global topic policy |
| untilRemoteClustersAsserted( |
| admin -> assertEquals(admin.topicPolicies(true).getPublishRate(topic), publishRate)); |
| |
| // remove global topic policy |
| admin1.topicPolicies(true).removePublishRate(topic); |
| untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getPublishRate(topic))); |
| } |
| |
| private void untilRemoteClustersAsserted(ThrowingConsumer<PulsarAdmin> condition) { |
| Awaitility.await().untilAsserted(() -> condition.apply(admin2)); |
| Awaitility.await().untilAsserted(() -> condition.apply(admin3)); |
| } |
| |
| private interface ThrowingConsumer<I> { |
| void apply(I input) throws Throwable; |
| } |
| |
| |
| @Test |
| public void testReplicatePersistentPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set PersistencePolicies |
| PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000); |
| admin1.topicPolicies(true).setPersistence(topic, policies); |
| |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getPersistence(topic), policies)); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getPersistence(topic), policies)); |
| //remove PersistencePolicies |
| admin1.topicPolicies(true).removePersistence(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getPersistence(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getPersistence(topic))); |
| } |
| |
| @Test |
| public void testReplicateDeduplicationStatusPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set subscription types policies |
| admin1.topicPolicies(true).setDeduplicationStatus(topic, true); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertTrue(admin2.topicPolicies(true).getDeduplicationStatus(topic))); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertTrue(admin3.topicPolicies(true).getDeduplicationStatus(topic))); |
| // remove subscription types policies |
| admin1.topicPolicies(true).removeDeduplicationStatus(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getDeduplicationStatus(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getDeduplicationStatus(topic))); |
| |
| } |
| |
| @Test |
| public void testReplicatorMaxProducer() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| |
| // set max producer policies |
| admin1.topicPolicies(true).setMaxProducers(topic, 100); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getMaxProducers(topic).intValue(), 100)); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getMaxProducers(topic).intValue(), 100)); |
| |
| // remove max producer policies |
| admin1.topicPolicies(true).removeMaxProducers(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getMaxProducers(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getMaxProducers(topic))); |
| } |
| |
| @Test |
| public void testReplicatorMaxConsumerPerSubPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| |
| init(namespace, topic); |
| // set max consumer per sub |
| admin1.topicPolicies(true).setMaxConsumersPerSubscription(topic, 100); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100)); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100)); |
| |
| Awaitility.await().untilAsserted(() -> { |
| assertEquals(admin1.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100); |
| assertNull(admin1.topicPolicies().getMaxConsumersPerSubscription(topic)); |
| }); |
| |
| //remove max consumer per sub |
| admin1.topicPolicies(true).removeMaxConsumersPerSubscription(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic))); |
| } |
| |
| @Test |
| public void testReplicateMaxUnackedMsgPerConsumer() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set max unacked msgs per consumers |
| admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100)); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100)); |
| // remove max unacked msgs per consumers |
| admin1.topicPolicies(true).removeMaxUnackedMessagesOnConsumer(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic))); |
| } |
| |
| @Test |
| public void testReplicatorTopicPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| |
| init(namespace, persistentTopicName); |
| // set retention |
| RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1); |
| admin1.topicPolicies(true).setRetention(persistentTopicName, retentionPolicies); |
| |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies)); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies)); |
| |
| Awaitility.await().untilAsserted(() -> { |
| assertEquals(admin1.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies); |
| assertNull(admin1.topicPolicies().getRetention(persistentTopicName)); |
| }); |
| |
| //remove retention |
| admin1.topicPolicies(true).removeRetention(persistentTopicName); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getRetention(persistentTopicName))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getRetention(persistentTopicName))); |
| } |
| @Test |
| public void testReplicateSubscriptionTypesPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| Set<SubscriptionType> subscriptionTypes = new HashSet<>(); |
| subscriptionTypes.add(SubscriptionType.Shared); |
| // set subscription types policies |
| admin1.topicPolicies(true).setSubscriptionTypesEnabled(topic, subscriptionTypes); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes)); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes)); |
| // remove subscription types policies |
| admin1.topicPolicies(true).removeSubscriptionTypesEnabled(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet())); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet())); |
| |
| } |
| |
| |
| @Test |
| public void testReplicateMaxConsumers() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set max consumers |
| admin1.topicPolicies(true).setMaxConsumers(topic, 100); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getMaxConsumers(topic).intValue(), 100)); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getMaxConsumers(topic).intValue(), 100)); |
| // remove max consumers |
| admin1.topicPolicies(true).removeMaxConsumers(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getMaxConsumers(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getMaxConsumers(topic))); |
| } |
| |
| @Test |
| public void testReplicatorMessageDispatchRatePolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| |
| init(namespace, persistentTopicName); |
| // set dispatchRate |
| DispatchRate dispatchRate = DispatchRate.builder() |
| .dispatchThrottlingRateInMsg(1) |
| .dispatchThrottlingRateInMsg(2) |
| .ratePeriodInSecond(3) |
| .relativeToPublishRate(true) |
| .build(); |
| admin1.topicPolicies(true).setDispatchRate(persistentTopicName, dispatchRate); |
| |
| // get dispatchRate |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getDispatchRate(persistentTopicName), dispatchRate)); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getDispatchRate(persistentTopicName), dispatchRate)); |
| |
| //remove dispatchRate |
| admin1.topicPolicies(true).removeDispatchRate(persistentTopicName); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getDispatchRate(persistentTopicName))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getDispatchRate(persistentTopicName))); |
| } |
| |
| @Test |
| public void testReplicatorInactiveTopicPolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, persistentTopicName); |
| |
| // set InactiveTopicPolicies |
| InactiveTopicPolicies inactiveTopicPolicies = |
| new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true); |
| admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true) |
| .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies)); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true) |
| .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies)); |
| // remove InactiveTopicPolicies |
| admin1.topicPolicies(true).removeInactiveTopicPolicies(persistentTopicName); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName))); |
| } |
| |
| @Test |
| public void testReplicatorSubscriptionDispatchRatePolicies() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| |
| init(namespace, persistentTopicName); |
| // set subscription dispatch rate |
| DispatchRate dispatchRate = DispatchRate.builder() |
| .dispatchThrottlingRateInMsg(1) |
| .ratePeriodInSecond(1) |
| .dispatchThrottlingRateInByte(1) |
| .relativeToPublishRate(true) |
| .build(); |
| admin1.topicPolicies(true).setSubscriptionDispatchRate(persistentTopicName, dispatchRate); |
| // get subscription dispatch rate |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true) |
| .getSubscriptionDispatchRate(persistentTopicName), dispatchRate)); |
| Awaitility.await().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true) |
| .getSubscriptionDispatchRate(persistentTopicName), dispatchRate)); |
| |
| //remove subscription dispatch rate |
| admin1.topicPolicies(true).removeSubscriptionDispatchRate(persistentTopicName); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName))); |
| } |
| @Test |
| public void testReplicateMaxUnackedMsgPerSub() throws Exception { |
| final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID(); |
| final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID(); |
| init(namespace, topic); |
| // set max unacked msgs per sub |
| admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 100); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100)); |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> |
| assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100)); |
| // remove max unacked msgs per sub |
| admin1.topicPolicies(true).removeMaxUnackedMessagesOnSubscription(topic); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic))); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic))); |
| } |
| |
| private void init(String namespace, String topic) |
| throws PulsarAdminException, PulsarClientException, PulsarServerException { |
| final String cluster2 = pulsar2.getConfig().getClusterName(); |
| final String cluster1 = pulsar1.getConfig().getClusterName(); |
| final String cluster3 = pulsar3.getConfig().getClusterName(); |
| |
| admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3)); |
| admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); |
| // Create partitioned-topic from R1 |
| admin1.topics().createPartitionedTopic(topic, 3); |
| // List partitioned topics from R2 |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace))); |
| Awaitility.await().untilAsserted(() -> assertEquals( |
| admin2.topics().getPartitionedTopicList(namespace).get(0), topic)); |
| assertEquals(admin1.topics().getList(namespace).size(), 3); |
| // List partitioned topics from R3 |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace))); |
| Awaitility.await().untilAsserted(() -> assertEquals( |
| admin3.topics().getPartitionedTopicList(namespace).get(0), topic)); |
| |
| pulsar1.getClient().newProducer().topic(topic).create().close(); |
| pulsar2.getClient().newProducer().topic(topic).create().close(); |
| pulsar3.getClient().newProducer().topic(topic).create().close(); |
| |
| //init topic policies server |
| Awaitility.await().ignoreExceptions().untilAsserted(() -> { |
| assertNull(pulsar1.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))); |
| assertNull(pulsar2.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))); |
| assertNull(pulsar3.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))); |
| }); |
| } |
| |
| } |