| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.admin; |
| |
| import static org.apache.commons.lang3.StringUtils.isBlank; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertNotEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import java.lang.reflect.Field; |
| import java.net.URL; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import javax.ws.rs.core.Response.Status; |
| import lombok.Cleanup; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.mledger.ManagedLedger; |
| import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; |
| import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; |
| import org.apache.pulsar.broker.BrokerTestUtil; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService; |
| import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; |
| import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; |
| import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; |
| import org.apache.pulsar.broker.service.Topic; |
| import org.apache.pulsar.broker.service.persistent.PersistentSubscription; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.MessageRoutingMode; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.ProxyProtocol; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.SubscriptionInitialPosition; |
| import org.apache.pulsar.client.api.SubscriptionType; |
| import org.apache.pulsar.client.impl.MessageIdImpl; |
| import org.apache.pulsar.common.naming.TopicDomain; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; |
| import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; |
| import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.ConsumerStats; |
| import org.apache.pulsar.common.policies.data.FailureDomain; |
| import org.apache.pulsar.common.policies.data.NamespaceIsolationData; |
| import org.apache.pulsar.common.policies.data.PartitionedTopicStats; |
| import org.apache.pulsar.common.policies.data.PersistencePolicies; |
| import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.SubscriptionStats; |
| import org.apache.pulsar.common.policies.data.TenantInfo; |
| import org.apache.pulsar.common.policies.data.TopicStats; |
| import org.awaitility.Awaitility; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.DataProvider; |
| import org.testng.annotations.Test; |
| |
| @Slf4j |
| @Test(groups = "broker") |
| public class AdminApiTest2 extends MockedPulsarServiceBaseTest { |
| |
| private MockedPulsarService mockPulsarSetup; |
| |
| @BeforeMethod |
| @Override |
| public void setup() throws Exception { |
| conf.setLoadBalancerEnabled(true); |
| conf.setEnableNamespaceIsolationUpdateOnTime(true); |
| super.internalSetup(); |
| |
| // create otherbroker to test redirect on calls that need |
| // namespace ownership |
| mockPulsarSetup = new MockedPulsarService(this.conf); |
| mockPulsarSetup.setup(); |
| |
| // Setup namespaces |
| admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("prop-xyz", tenantInfo); |
| admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test")); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| public void cleanup() throws Exception { |
| super.internalCleanup(); |
| if (mockPulsarSetup != null) { |
| mockPulsarSetup.cleanup(); |
| } |
| resetConfig(); |
| } |
| |
| @DataProvider(name = "topicType") |
| public Object[][] topicTypeProvider() { |
| return new Object[][] { { TopicDomain.persistent.value() }, { TopicDomain.non_persistent.value() } }; |
| } |
| |
| @DataProvider(name = "namespaceNames") |
| public Object[][] namespaceNameProvider() { |
| return new Object[][] { { "ns1" }, { "global" } }; |
| } |
| |
| /** |
| * <pre> |
| * It verifies increasing partitions for partitioned-topic. |
| * 1. create a partitioned-topic |
| * 2. update partitions with larger number of partitions |
| * 3. verify: getPartitionedMetadata and check number of partitions |
| * 4. verify: this api creates existing subscription to new partitioned-topics |
| * so, message will not be lost in new partitions |
| * a. start producer and produce messages |
| * b. check existing subscription for new topics and it should have backlog msgs |
| * |
| * </pre> |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testIncrementPartitionsOfTopic() throws Exception { |
| final String topicName = "increment-partitionedTopic"; |
| final String subName1 = topicName + "-my-sub-1/encode"; |
| final String subName2 = topicName + "-my-sub-2/encode"; |
| final int startPartitions = 4; |
| final int newPartitions = 8; |
| final String partitionedTopicName = "persistent://prop-xyz/ns1/" + topicName; |
| |
| URL pulsarUrl = new URL(pulsar.getWebServiceAddress()); |
| |
| admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions); |
| // validate partition topic is created |
| assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, |
| startPartitions); |
| |
| // create consumer and subscriptions : check subscriptions |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build(); |
| Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1) |
| .subscriptionType(SubscriptionType.Shared).subscribe(); |
| assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList(subName1)); |
| Consumer<byte[]> consumer2 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName2) |
| .subscriptionType(SubscriptionType.Shared).subscribe(); |
| assertEquals(Sets.newHashSet(admin.topics().getSubscriptions(partitionedTopicName)), |
| Sets.newHashSet(subName1, subName2)); |
| |
| // (1) update partitions |
| admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions); |
| // verify new partitions have been created |
| assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, |
| newPartitions); |
| // (2) No Msg loss: verify new partitions have the same existing subscription names |
| final String newPartitionTopicName = TopicName.get(partitionedTopicName).getPartition(startPartitions + 1) |
| .toString(); |
| |
| // (3) produce messages to all partitions including newly created partitions (RoundRobin) |
| Producer<byte[]> producer = client.newProducer() |
| .topic(partitionedTopicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) |
| .create(); |
| final int totalMessages = newPartitions * 2; |
| for (int i = 0; i < totalMessages; i++) { |
| String message = "message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| // (4) verify existing subscription has not lost any message: create new consumer with sub-2: it will load all |
| // newly created partition topics |
| consumer2.close(); |
| consumer2 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName2) |
| .subscriptionType(SubscriptionType.Shared).subscribe(); |
| // sometime: mockZk fails to refresh ml-cache: so, invalidate the cache to get fresh data |
| pulsar.getLocalZkCacheService().managedLedgerListCache().clearTree(); |
| assertEquals(Sets.newHashSet(admin.topics().getSubscriptions(newPartitionTopicName)), |
| Sets.newHashSet(subName1, subName2)); |
| |
| assertEquals(Sets.newHashSet(admin.topics().getList("prop-xyz/ns1")).size(), newPartitions); |
| |
| // test cumulative stats for partitioned topic |
| PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false); |
| assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList(subName1, subName2))); |
| assertEquals(topicStats.subscriptions.get(subName2).consumers.size(), 1); |
| assertEquals(topicStats.subscriptions.get(subName2).msgBacklog, totalMessages); |
| assertEquals(topicStats.publishers.size(), 1); |
| assertEquals(topicStats.partitions, Maps.newHashMap()); |
| |
| // (5) verify: each partition should have backlog |
| topicStats = admin.topics().getPartitionedStats(partitionedTopicName, true); |
| assertEquals(topicStats.metadata.partitions, newPartitions); |
| Set<String> partitionSet = Sets.newHashSet(); |
| for (int i = 0; i < newPartitions; i++) { |
| partitionSet.add(partitionedTopicName + "-partition-" + i); |
| } |
| assertEquals(topicStats.partitions.keySet(), partitionSet); |
| for (int i = 0; i < newPartitions; i++) { |
| TopicStats partitionStats = topicStats.partitions |
| .get(TopicName.get(partitionedTopicName).getPartition(i).toString()); |
| assertEquals(partitionStats.publishers.size(), 1); |
| assertEquals(partitionStats.subscriptions.get(subName2).consumers.size(), 1); |
| assertEquals(partitionStats.subscriptions.get(subName2).msgBacklog, 2, 1); |
| } |
| |
| producer.close(); |
| consumer1.close(); |
| consumer2.close(); |
| consumer2.close(); |
| } |
| |
| @Test |
| public void testTopicPoliciesWithMultiBroker() throws Exception { |
| //setup cluster with 3 broker |
| cleanup(); |
| conf.setSystemTopicEnabled(true); |
| conf.setTopicLevelPoliciesEnabled(true); |
| super.internalSetup(); |
| admin.clusters().createCluster("test" |
| , new ClusterData(pulsar.getWebServiceAddress() + ",localhost:1026," + "localhost:2050")); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("prop-xyz", tenantInfo); |
| admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test")); |
| conf.setBrokerServicePort(Optional.of(1024)); |
| conf.setBrokerServicePortTls(Optional.of(1025)); |
| conf.setWebServicePort(Optional.of(1026)); |
| conf.setWebServicePortTls(Optional.of(1027)); |
| @Cleanup |
| PulsarService pulsar2 = startBrokerWithoutAuthorization(conf); |
| conf.setBrokerServicePort(Optional.of(2048)); |
| conf.setBrokerServicePortTls(Optional.of(2049)); |
| conf.setWebServicePort(Optional.of(2050)); |
| conf.setWebServicePortTls(Optional.of(2051)); |
| @Cleanup |
| PulsarService pulsar3 = startBrokerWithoutAuthorization(conf); |
| @Cleanup |
| PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build(); |
| @Cleanup |
| PulsarAdmin admin3 = PulsarAdmin.builder().serviceHttpUrl(pulsar3.getWebServiceAddress()).build(); |
| |
| //for partitioned topic, we can get topic policies from every broker |
| final String topic = "persistent://prop-xyz/ns1/" + BrokerTestUtil.newUniqueName("test"); |
| int partitionNum = 3; |
| admin.topics().createPartitionedTopic(topic, partitionNum); |
| pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close(); |
| TopicName topicName = TopicName.get(topic); |
| Awaitility.await().until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)); |
| |
| setTopicPoliciesAndValidate(admin2, admin3, topic); |
| //for non-partitioned topic, we can get topic policies from every broker |
| final String topic2 = "persistent://prop-xyz/ns1/" + BrokerTestUtil.newUniqueName("test"); |
| pulsarClient.newConsumer().topic(topic2).subscriptionName("sub").subscribe().close(); |
| setTopicPoliciesAndValidate(admin2, admin3, topic2); |
| } |
| |
| private void setTopicPoliciesAndValidate(PulsarAdmin admin2 |
| , PulsarAdmin admin3, String topic) throws Exception { |
| admin.topics().setMaxUnackedMessagesOnConsumer(topic, 100); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic))); |
| admin.topics().setMaxConsumers(topic, 101); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxConsumers(topic))); |
| admin.topics().setMaxProducers(topic, 102); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxProducers(topic))); |
| |
| assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100); |
| assertEquals(admin2.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100); |
| assertEquals(admin3.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(), 100); |
| assertEquals(admin.topics().getMaxConsumers(topic).intValue(), 101); |
| assertEquals(admin2.topics().getMaxConsumers(topic).intValue(), 101); |
| assertEquals(admin3.topics().getMaxConsumers(topic).intValue(), 101); |
| assertEquals(admin.topics().getMaxProducers(topic).intValue(), 102); |
| assertEquals(admin2.topics().getMaxProducers(topic).intValue(), 102); |
| assertEquals(admin3.topics().getMaxProducers(topic).intValue(), 102); |
| } |
| |
| /** |
| * verifies admin api command for non-persistent topic. It verifies: partitioned-topic, stats |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void nonPersistentTopics() throws Exception { |
| final String topicName = "nonPersistentTopic"; |
| |
| final String persistentTopicName = "non-persistent://prop-xyz/ns1/" + topicName; |
| // Force to create a topic |
| publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 0, 0); |
| |
| // create consumer and subscription |
| @Cleanup |
| PulsarClient client = PulsarClient.builder() |
| .serviceUrl(pulsar.getWebServiceAddress()) |
| .statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName("my-sub") |
| .subscribe(); |
| |
| publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 10, 0); |
| |
| TopicStats topicStats = admin.topics().getStats(persistentTopicName); |
| assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); |
| assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1); |
| assertEquals(topicStats.publishers.size(), 0); |
| |
| PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false); |
| assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub"))); |
| |
| consumer.close(); |
| |
| topicStats = admin.topics().getStats(persistentTopicName); |
| assertTrue(topicStats.subscriptions.containsKey("my-sub")); |
| assertEquals(topicStats.publishers.size(), 0); |
| |
| // test partitioned-topic |
| final String partitionedTopicName = "non-persistent://prop-xyz/ns1/paritioned"; |
| assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0); |
| admin.topics().createPartitionedTopic(partitionedTopicName, 5); |
| assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 5); |
| } |
| |
| private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception { |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| for (int i = startIdx; i < (messages + startIdx); i++) { |
| String message = "message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| producer.close(); |
| } |
| |
| /** |
| * verifies validation on persistent-policies |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testSetPersistencePolicies() throws Exception { |
| |
| final String namespace = "prop-xyz/ns2"; |
| admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); |
| |
| assertNull(admin.namespaces().getPersistence(namespace)); |
| admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 10.0)); |
| assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 10.0)); |
| |
| try { |
| admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 4, 3, 10.0)); |
| fail("should have failed"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), 400); |
| } |
| try { |
| admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 4, 10.0)); |
| fail("should have failed"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), 400); |
| } |
| try { |
| admin.namespaces().setPersistence(namespace, new PersistencePolicies(6, 3, 1, 10.0)); |
| fail("should have failed"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), 400); |
| } |
| |
| // make sure policies has not been changed |
| assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 10.0)); |
| } |
| |
| /** |
| * validates update of persistent-policies reflects on managed-ledger and managed-cursor |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception { |
| |
| final String namespace = "prop-xyz/ns2"; |
| final String topicName = "persistent://" + namespace + "/topic1"; |
| admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); |
| |
| admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0)); |
| assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0)); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); |
| |
| PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); |
| ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger(); |
| ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.getCursors().iterator().next(); |
| |
| final double newThrottleRate = 100; |
| final int newEnsembleSize = 5; |
| admin.namespaces().setPersistence(namespace, new PersistencePolicies(newEnsembleSize, 3, 3, newThrottleRate)); |
| |
| retryStrategically((test) -> managedLedger.getConfig().getEnsembleSize() == newEnsembleSize |
| && cursor.getThrottleMarkDelete() != newThrottleRate, 5, 200); |
| |
| // (1) verify cursor.markDelete has been updated |
| assertEquals(cursor.getThrottleMarkDelete(), newThrottleRate); |
| |
| // (2) verify new ledger creation takes new config |
| producer.close(); |
| consumer.close(); |
| } |
| |
| /** |
| * Verify unloading topic |
| * |
| * @throws Exception |
| */ |
| @Test(dataProvider = "topicType") |
| public void testUnloadTopic(final String topicType) throws Exception { |
| |
| final String namespace = "prop-xyz/ns2"; |
| final String topicName = topicType + "://" + namespace + "/topic1"; |
| admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); |
| |
| // create a topic by creating a producer |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); |
| producer.close(); |
| |
| Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).join().get(); |
| final boolean isPersistentTopic = topic instanceof PersistentTopic; |
| |
| // (1) unload the topic |
| unloadTopic(topicName); |
| |
| // topic must be removed from map |
| assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); |
| |
| // recreation of producer will load the topic again |
| pulsarClient.newProducer().topic(topicName).create(); |
| topic = pulsar.getBrokerService().getTopicReference(topicName).get(); |
| assertNotNull(topic); |
| // unload the topic |
| unloadTopic(topicName); |
| // producer will retry and recreate the topic |
| for (int i = 0; i < 5; i++) { |
| if (!pulsar.getBrokerService().getTopicReference(topicName).isPresent() || i != 4) { |
| Thread.sleep(200); |
| } |
| } |
| // topic should be loaded by this time |
| topic = pulsar.getBrokerService().getTopicReference(topicName).get(); |
| assertNotNull(topic); |
| } |
| |
| private void unloadTopic(String topicName) throws Exception { |
| admin.topics().unload(topicName); |
| } |
| |
| /** |
| * Verifies reset-cursor at specific position using admin-api. |
| * |
| * <pre> |
| * 1. Publish 50 messages |
| * 2. Consume 20 messages |
| * 3. reset cursor position on 10th message |
| * 4. consume 40 messages from reset position |
| * </pre> |
| * |
| * @param namespaceName |
| * @throws Exception |
| */ |
| @Test(dataProvider = "namespaceNames", timeOut = 10000) |
| public void testResetCursorOnPosition(String namespaceName) throws Exception { |
| final String topicName = "persistent://prop-xyz/use/" + namespaceName + "/resetPosition"; |
| final int totalProducedMessages = 50; |
| |
| // set retention |
| admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10)); |
| |
| // create consumer and subscription |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") |
| .subscriptionType(SubscriptionType.Shared).subscribe(); |
| |
| assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub")); |
| |
| publishMessagesOnPersistentTopic(topicName, totalProducedMessages, 0); |
| |
| List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10); |
| assertEquals(messages.size(), 10); |
| |
| Message<byte[]> message = null; |
| MessageIdImpl resetMessageId = null; |
| int resetPositionId = 10; |
| for (int i = 0; i < 20; i++) { |
| message = consumer.receive(1, TimeUnit.SECONDS); |
| consumer.acknowledge(message); |
| if (i == resetPositionId) { |
| resetMessageId = (MessageIdImpl) message.getMessageId(); |
| } |
| } |
| |
| // close consumer which will clean up internal-receive-queue |
| consumer.close(); |
| |
| // messages should still be available due to retention |
| MessageIdImpl messageId = new MessageIdImpl( |
| resetMessageId.getLedgerId(), |
| resetMessageId.getEntryId(), |
| -1); |
| // reset position at resetMessageId |
| admin.topics().resetCursor(topicName, "my-sub", messageId); |
| |
| consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub") |
| .subscriptionType(SubscriptionType.Shared).subscribe(); |
| MessageIdImpl msgId2 = (MessageIdImpl) consumer.receive(1, TimeUnit.SECONDS).getMessageId(); |
| assertEquals(resetMessageId, msgId2); |
| |
| int receivedAfterReset = 1; // start with 1 because we have already received 1 msg |
| |
| for (int i = 0; i < totalProducedMessages; i++) { |
| message = consumer.receive(500, TimeUnit.MILLISECONDS); |
| if (message == null) { |
| break; |
| } |
| consumer.acknowledge(message); |
| ++receivedAfterReset; |
| } |
| assertEquals(receivedAfterReset, totalProducedMessages - resetPositionId); |
| |
| // invalid topic name |
| try { |
| admin.topics().resetCursor(topicName + "invalid", "my-sub", messageId); |
| fail("It should have failed due to invalid topic name"); |
| } catch (PulsarAdminException.NotFoundException e) { |
| // Ok |
| } |
| |
| // invalid cursor name |
| try { |
| admin.topics().resetCursor(topicName, "invalid-sub", messageId); |
| fail("It should have failed due to invalid subscription name"); |
| } catch (PulsarAdminException.NotFoundException e) { |
| // Ok |
| } |
| |
| // invalid position |
| try { |
| messageId = new MessageIdImpl(0, 0, -1); |
| admin.topics().resetCursor(topicName, "my-sub", messageId); |
| } catch (PulsarAdminException.PreconditionFailedException e) { |
| fail("It shouldn't fail for a invalid position"); |
| } |
| |
| consumer.close(); |
| } |
| |
| private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { |
| Producer<byte[]> producer = pulsarClient.newProducer() |
| .topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| for (int i = startIdx; i < (messages + startIdx); i++) { |
| String message = "message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| producer.close(); |
| } |
| |
| |
| @Test(timeOut = 20000) |
| public void testMaxConsumersOnSubApi() throws Exception { |
| final String namespace = "prop-xyz/ns1"; |
| assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace)); |
| admin.namespaces().setMaxConsumersPerSubscription(namespace, 10); |
| Awaitility.await().untilAsserted(() -> { |
| assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace)); |
| assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(), 10); |
| }); |
| admin.namespaces().removeMaxConsumersPerSubscription(namespace); |
| Awaitility.await().untilAsserted(() -> |
| admin.namespaces().getMaxConsumersPerSubscription(namespace)); |
| } |
| |
| /** |
| * It verifies that pulsar with different load-manager generates different load-report and returned by admin-api |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testLoadReportApi() throws Exception { |
| |
| this.conf.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); |
| MockedPulsarService mockPulsarSetup1 = new MockedPulsarService(this.conf); |
| mockPulsarSetup1.setup(); |
| PulsarAdmin simpleLoadManagerAdmin = mockPulsarSetup1.getAdmin(); |
| assertNotNull(simpleLoadManagerAdmin.brokerStats().getLoadReport()); |
| |
| this.conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); |
| MockedPulsarService mockPulsarSetup2 = new MockedPulsarService(this.conf); |
| mockPulsarSetup2.setup(); |
| PulsarAdmin modularLoadManagerAdmin = mockPulsarSetup2.getAdmin(); |
| assertNotNull(modularLoadManagerAdmin.brokerStats().getLoadReport()); |
| |
| mockPulsarSetup1.cleanup(); |
| mockPulsarSetup2.cleanup(); |
| } |
| |
| @Test |
| public void testPeerCluster() throws Exception { |
| admin.clusters().createCluster("us-west1", |
| new ClusterData("http://broker.messaging.west1.example.com:8080")); |
| admin.clusters().createCluster("us-west2", |
| new ClusterData("http://broker.messaging.west2.example.com:8080")); |
| admin.clusters().createCluster("us-east1", |
| new ClusterData("http://broker.messaging.east1.example.com:8080")); |
| admin.clusters().createCluster("us-east2", |
| new ClusterData("http://broker.messaging.east2.example.com:8080")); |
| |
| admin.clusters().updatePeerClusterNames("us-west1", Sets.newLinkedHashSet(Lists.newArrayList("us-west2"))); |
| assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(), Lists.newArrayList("us-west2")); |
| assertNull(admin.clusters().getCluster("us-west2").getPeerClusterNames()); |
| // update cluster with duplicate peer-clusters in the list |
| admin.clusters().updatePeerClusterNames("us-west1", Sets.newLinkedHashSet( |
| Lists.newArrayList("us-west2", "us-east1", "us-west2", "us-east1", "us-west2", "us-east1"))); |
| assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(), |
| Lists.newArrayList("us-west2", "us-east1")); |
| admin.clusters().updatePeerClusterNames("us-west1", null); |
| assertNull(admin.clusters().getCluster("us-west1").getPeerClusterNames()); |
| |
| // Check name validation |
| try { |
| admin.clusters().updatePeerClusterNames("us-west1", |
| Sets.newLinkedHashSet(Lists.newArrayList("invalid-cluster"))); |
| fail("should have failed"); |
| } catch (PulsarAdminException e) { |
| assertTrue(e instanceof PreconditionFailedException); |
| } |
| |
| // Cluster itself can't be part of peer-list |
| try { |
| admin.clusters().updatePeerClusterNames("us-west1", Sets.newLinkedHashSet(Lists.newArrayList("us-west1"))); |
| fail("should have failed"); |
| } catch (PulsarAdminException e) { |
| assertTrue(e instanceof PreconditionFailedException); |
| } |
| } |
| |
| /** |
| * It validates that peer-cluster can't coexist in replication-cluster list |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testReplicationPeerCluster() throws Exception { |
| admin.clusters().createCluster("us-west1", |
| new ClusterData("http://broker.messaging.west1.example.com:8080")); |
| admin.clusters().createCluster("us-west2", |
| new ClusterData("http://broker.messaging.west2.example.com:8080")); |
| admin.clusters().createCluster("us-west3", |
| new ClusterData("http://broker.messaging.west2.example.com:8080")); |
| admin.clusters().createCluster("us-west4", |
| new ClusterData("http://broker.messaging.west2.example.com:8080")); |
| admin.clusters().createCluster("us-east1", |
| new ClusterData("http://broker.messaging.east1.example.com:8080")); |
| admin.clusters().createCluster("us-east2", |
| new ClusterData("http://broker.messaging.east2.example.com:8080")); |
| admin.clusters().createCluster("global", new ClusterData()); |
| |
| List<String> allClusters = admin.clusters().getClusters(); |
| Collections.sort(allClusters); |
| assertEquals(allClusters, |
| Lists.newArrayList("test", "us-east1", "us-east2", "us-west1", "us-west2", "us-west3", "us-west4")); |
| |
| final String property = "peer-prop"; |
| Set<String> allowedClusters = Sets.newHashSet("us-west1", "us-west2", "us-west3", "us-west4", "us-east1", |
| "us-east2", "global"); |
| TenantInfo propConfig = new TenantInfo(Sets.newHashSet("test"), allowedClusters); |
| admin.tenants().createTenant(property, propConfig); |
| |
| final String namespace = property + "/global/conflictPeer"; |
| admin.namespaces().createNamespace(namespace); |
| |
| admin.clusters().updatePeerClusterNames("us-west1", |
| Sets.newLinkedHashSet(Lists.newArrayList("us-west2", "us-west3"))); |
| assertEquals(admin.clusters().getCluster("us-west1").getPeerClusterNames(), |
| Lists.newArrayList("us-west2", "us-west3")); |
| |
| // (1) no conflicting peer |
| Set<String> clusterIds = Sets.newHashSet("us-east1", "us-east2"); |
| admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds); |
| |
| // (2) conflicting peer |
| clusterIds = Sets.newHashSet("us-west2", "us-west3", "us-west1"); |
| try { |
| admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds); |
| fail("Peer-cluster can't coexist in replication cluster list"); |
| } catch (PulsarAdminException.ConflictException e) { |
| // Ok |
| } |
| |
| clusterIds = Sets.newHashSet("us-west2", "us-west3"); |
| // no peer coexist in replication clusters |
| admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds); |
| |
| clusterIds = Sets.newHashSet("us-west1", "us-west4"); |
| // no peer coexist in replication clusters |
| admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds); |
| } |
| |
| @Test |
| public void clusterFailureDomain() throws PulsarAdminException { |
| |
| final String cluster = pulsar.getConfiguration().getClusterName(); |
| // create |
| FailureDomain domain = new FailureDomain(); |
| domain.setBrokers(Sets.newHashSet("b1", "b2", "b3")); |
| admin.clusters().createFailureDomain(cluster, "domain-1", domain); |
| admin.clusters().updateFailureDomain(cluster, "domain-1", domain); |
| |
| assertEquals(admin.clusters().getFailureDomain(cluster, "domain-1"), domain); |
| |
| Map<String, FailureDomain> domains = admin.clusters().getFailureDomains(cluster); |
| assertEquals(domains.size(), 1); |
| assertTrue(domains.containsKey("domain-1")); |
| |
| try { |
| // try to create domain with already registered brokers |
| admin.clusters().createFailureDomain(cluster, "domain-2", domain); |
| fail("should have failed because of brokers are already registered"); |
| } catch (PulsarAdminException.ConflictException e) { |
| // Ok |
| } |
| |
| admin.clusters().deleteFailureDomain(cluster, "domain-1"); |
| assertTrue(admin.clusters().getFailureDomains(cluster).isEmpty()); |
| |
| admin.clusters().createFailureDomain(cluster, "domain-2", domain); |
| domains = admin.clusters().getFailureDomains(cluster); |
| assertEquals(domains.size(), 1); |
| assertTrue(domains.containsKey("domain-2")); |
| } |
| |
| @Test |
| public void namespaceAntiAffinity() throws PulsarAdminException { |
| final String namespace = "prop-xyz/ns1"; |
| final String antiAffinityGroup = "group"; |
| assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace))); |
| admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup); |
| assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), antiAffinityGroup); |
| admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace); |
| assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace))); |
| |
| final String ns1 = "prop-xyz/antiAG1"; |
| final String ns2 = "prop-xyz/antiAG2"; |
| final String ns3 = "prop-xyz/antiAG3"; |
| admin.namespaces().createNamespace(ns1, Sets.newHashSet("test")); |
| admin.namespaces().createNamespace(ns2, Sets.newHashSet("test")); |
| admin.namespaces().createNamespace(ns3, Sets.newHashSet("test")); |
| admin.namespaces().setNamespaceAntiAffinityGroup(ns1, antiAffinityGroup); |
| admin.namespaces().setNamespaceAntiAffinityGroup(ns2, antiAffinityGroup); |
| admin.namespaces().setNamespaceAntiAffinityGroup(ns3, antiAffinityGroup); |
| |
| Set<String> namespaces = new HashSet<>( |
| admin.namespaces().getAntiAffinityNamespaces("prop-xyz", "test", antiAffinityGroup)); |
| assertEquals(namespaces.size(), 3); |
| assertTrue(namespaces.contains(ns1)); |
| assertTrue(namespaces.contains(ns2)); |
| assertTrue(namespaces.contains(ns3)); |
| |
| List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces("prop-xyz", "test", "invalid-group"); |
| assertEquals(namespaces2.size(), 0); |
| } |
| |
| @Test |
| public void testNonPersistentTopics() throws Exception { |
| final String namespace = "prop-xyz/ns2"; |
| final String topicName = "non-persistent://" + namespace + "/topic"; |
| admin.namespaces().createNamespace(namespace, 20); |
| admin.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("test")); |
| int totalTopics = 100; |
| |
| Set<String> topicNames = Sets.newHashSet(); |
| for (int i = 0; i < totalTopics; i++) { |
| topicNames.add(topicName + i); |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName + i).create(); |
| producer.close(); |
| } |
| |
| for (int i = 0; i < totalTopics; i++) { |
| Topic topic = pulsar.getBrokerService().getTopicReference(topicName + i).get(); |
| assertNotNull(topic); |
| } |
| |
| Set<String> topicsInNs = Sets.newHashSet(admin.topics().getList(namespace)); |
| assertEquals(topicsInNs.size(), totalTopics); |
| topicsInNs.removeAll(topicNames); |
| assertEquals(topicsInNs.size(), 0); |
| } |
| |
| @Test |
| public void testPublishConsumerStats() throws Exception { |
| final String topicName = "statTopic"; |
| final String subscriberName = topicName + "-my-sub-1"; |
| final String topic = "persistent://prop-xyz/ns1/" + topicName; |
| final String producerName = "myProducer"; |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName) |
| .subscriptionType(SubscriptionType.Shared).subscribe(); |
| Producer<byte[]> producer = client.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .producerName(producerName) |
| .create(); |
| |
| retryStrategically((test) -> { |
| TopicStats stats; |
| try { |
| stats = admin.topics().getStats(topic); |
| return stats.publishers.size() > 0 && stats.subscriptions.get(subscriberName) != null |
| && stats.subscriptions.get(subscriberName).consumers.size() > 0; |
| } catch (PulsarAdminException e) { |
| return false; |
| } |
| }, 5, 200); |
| |
| TopicStats topicStats = admin.topics().getStats(topic); |
| assertEquals(topicStats.publishers.size(), 1); |
| assertNotNull(topicStats.publishers.get(0).getAddress()); |
| assertNotNull(topicStats.publishers.get(0).getClientVersion()); |
| assertNotNull(topicStats.publishers.get(0).getConnectedSince()); |
| assertNotNull(topicStats.publishers.get(0).getProducerName()); |
| assertEquals(topicStats.publishers.get(0).getProducerName(), producerName); |
| |
| SubscriptionStats subscriber = topicStats.subscriptions.get(subscriberName); |
| assertNotNull(subscriber); |
| assertEquals(subscriber.consumers.size(), 1); |
| ConsumerStats consumerStats = subscriber.consumers.get(0); |
| assertNotNull(consumerStats.getAddress()); |
| assertNotNull(consumerStats.getClientVersion()); |
| assertNotNull(consumerStats.getConnectedSince()); |
| |
| producer.close(); |
| consumer.close(); |
| } |
| |
| @Test |
| public void testTenantNameWithUnderscore() throws Exception { |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("prop_xyz", tenantInfo); |
| |
| admin.namespaces().createNamespace("prop_xyz/my-namespace", Sets.newHashSet("test")); |
| |
| String topic = "persistent://prop_xyz/use/my-namespace/my-topic"; |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .create(); |
| |
| TopicStats stats = admin.topics().getStats(topic); |
| assertEquals(stats.publishers.size(), 1); |
| producer.close(); |
| } |
| |
| @Test |
| public void testTenantNameWithInvalidCharacters() { |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| |
| // If we try to create property with invalid characters, it should fail immediately |
| try { |
| admin.tenants().createTenant("prop xyz", tenantInfo); |
| fail("Should have failed"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode()); |
| } |
| |
| try { |
| admin.tenants().createTenant("prop&xyz", tenantInfo); |
| fail("Should have failed"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode()); |
| } |
| } |
| |
| @Test |
| public void testTenantWithNonexistentClusters() throws Exception { |
| // Check non-existing cluster |
| assertFalse(admin.clusters().getClusters().contains("cluster-non-existing")); |
| |
| Set<String> allowedClusters = Sets.newHashSet("cluster-non-existing"); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters); |
| |
| // If we try to create tenant with nonexistent clusters, it should fail immediately |
| try { |
| admin.tenants().createTenant("test-tenant", tenantInfo); |
| fail("Should have failed"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode()); |
| } |
| |
| assertFalse(admin.tenants().getTenants().contains("test-tenant")); |
| |
| // Check existing tenant |
| assertTrue(admin.tenants().getTenants().contains("prop-xyz")); |
| |
| // If we try to update existing tenant with nonexistent clusters, it should fail immediately |
| try { |
| admin.tenants().updateTenant("prop-xyz", tenantInfo); |
| fail("Should have failed"); |
| } catch (PulsarAdminException e) { |
| assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode()); |
| } |
| } |
| |
| @Test |
| public void brokerNamespaceIsolationPolicies() throws Exception { |
| |
| // create |
| String policyName1 = "policy-1"; |
| String cluster = pulsar.getConfiguration().getClusterName(); |
| String namespaceRegex = "other/" + cluster + "/other.*"; |
| String brokerName = pulsar.getAdvertisedAddress(); |
| String brokerAddress = brokerName + ":" + pulsar.getConfiguration().getWebServicePort().get(); |
| NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData(); |
| nsPolicyData1.namespaces = new ArrayList<>(); |
| nsPolicyData1.namespaces.add(namespaceRegex); |
| nsPolicyData1.primary = new ArrayList<>(); |
| nsPolicyData1.primary.add(brokerName + ":[0-9]*"); |
| nsPolicyData1.secondary = new ArrayList<>(); |
| nsPolicyData1.secondary.add(brokerName + ".*"); |
| nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData(); |
| nsPolicyData1.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available; |
| nsPolicyData1.auto_failover_policy.parameters = new HashMap<>(); |
| nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1"); |
| nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", "100"); |
| admin.clusters().createNamespaceIsolationPolicy(cluster, policyName1, nsPolicyData1); |
| |
| List<BrokerNamespaceIsolationData> brokerIsolationDataList = admin.clusters() |
| .getBrokersWithNamespaceIsolationPolicy(cluster); |
| assertEquals(brokerIsolationDataList.size(), 1); |
| assertEquals(brokerIsolationDataList.get(0).brokerName, brokerAddress); |
| assertEquals(brokerIsolationDataList.get(0).namespaceRegex.size(), 1); |
| assertEquals(brokerIsolationDataList.get(0).namespaceRegex.get(0), namespaceRegex); |
| |
| BrokerNamespaceIsolationData brokerIsolationData = admin.clusters() |
| .getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress); |
| assertEquals(brokerIsolationData.brokerName, brokerAddress); |
| assertEquals(brokerIsolationData.namespaceRegex.size(), 1); |
| assertEquals(brokerIsolationData.namespaceRegex.get(0), namespaceRegex); |
| |
| BrokerNamespaceIsolationData isolationData = admin.clusters().getBrokerWithNamespaceIsolationPolicy(cluster, "invalid-broker"); |
| assertFalse(isolationData.isPrimary); |
| } |
| |
| // create 1 namespace: |
| // 0. without isolation policy configured, lookup will success. |
| // 1. with matched isolation broker configured and matched, lookup will success. |
| // 2. update isolation policy, without broker matched, lookup will fail. |
| @Test |
| public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception { |
| String brokerName = pulsar.getAdvertisedAddress(); |
| String ns1Name = "prop-xyz/test_ns1_iso_" + System.currentTimeMillis(); |
| admin.namespaces().createNamespace(ns1Name, Sets.newHashSet("test")); |
| |
| // 0. without isolation policy configured, lookup will success. |
| String brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic1"); |
| assertTrue(brokerUrl.contains(brokerName)); |
| log.info("0 get lookup url {}", brokerUrl); |
| |
| // create |
| String policyName1 = "policy-1"; |
| String cluster = pulsar.getConfiguration().getClusterName(); |
| NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData(); |
| nsPolicyData1.namespaces = new ArrayList<>(); |
| nsPolicyData1.namespaces.add(ns1Name); |
| nsPolicyData1.primary = new ArrayList<>(); |
| nsPolicyData1.primary.add(brokerName + ".*"); |
| nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData(); |
| nsPolicyData1.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available; |
| nsPolicyData1.auto_failover_policy.parameters = new HashMap<>(); |
| nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1"); |
| nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", "100"); |
| admin.clusters().createNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get(); |
| |
| // 1. with matched isolation broker configured and matched, lookup will success. |
| brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic2"); |
| assertTrue(brokerUrl.contains(brokerName)); |
| log.info(" 1 get lookup url {}", brokerUrl); |
| |
| // 2. update isolation policy, without broker matched, lookup will fail. |
| nsPolicyData1.primary = new ArrayList<>(); |
| nsPolicyData1.primary.add(brokerName + "not_match"); |
| admin.clusters().updateNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get(); |
| |
| try { |
| admin.lookups().lookupTopic(ns1Name + "/topic3"); |
| } catch (Exception e) { |
| // expected lookup fail, because no brokers matched the policy. |
| log.info(" 2 expected fail lookup"); |
| } |
| |
| try { |
| admin.lookups().lookupTopic(ns1Name + "/topic1"); |
| } catch (Exception e) { |
| // expected lookup fail, because no brokers matched the policy. |
| log.info(" 22 expected fail lookup"); |
| } |
| } |
| |
| @Test |
| public void clustersList() throws PulsarAdminException { |
| final String cluster = pulsar.getConfiguration().getClusterName(); |
| admin.clusters().createCluster("global", new ClusterData("http://localhost:6650")); |
| |
| // Global cluster, if there, should be omitted from the results |
| assertEquals(admin.clusters().getClusters(), Lists.newArrayList(cluster)); |
| } |
| /** |
| * verifies cluster has been set before create topic |
| * |
| * @throws PulsarAdminException |
| */ |
| @Test |
| public void testClusterIsReadyBeforeCreateTopic() throws PulsarAdminException { |
| final String topicName = "partitionedTopic"; |
| final int partitions = 4; |
| final String persistentPartitionedTopicName = "persistent://prop-xyz/ns2/" + topicName; |
| final String NonPersistentPartitionedTopicName = "non-persistent://prop-xyz/ns2/" + topicName; |
| |
| admin.namespaces().createNamespace("prop-xyz/ns2"); |
| // By default the cluster will configure as configuration file. So the create topic operation |
| // will never throw exception except there is no cluster. |
| admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns2", new HashSet<String>()); |
| |
| try { |
| admin.topics().createPartitionedTopic(persistentPartitionedTopicName, partitions); |
| Assert.fail("should have failed due to Namespace does not have any clusters configured"); |
| } catch (PulsarAdminException.PreconditionFailedException ignored) { |
| } |
| |
| try { |
| admin.topics().createPartitionedTopic(NonPersistentPartitionedTopicName, partitions); |
| Assert.fail("should have failed due to Namespace does not have any clusters configured"); |
| } catch (PulsarAdminException.PreconditionFailedException ignored) { |
| } |
| } |
| |
| @Test |
| public void testCreateNamespaceWithNoClusters() throws PulsarAdminException { |
| String localCluster = pulsar.getConfiguration().getClusterName(); |
| String namespace = "prop-xyz/test-ns-with-no-clusters"; |
| admin.namespaces().createNamespace(namespace); |
| |
| // Global cluster, if there, should be omitted from the results |
| assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace), |
| Collections.singletonList(localCluster)); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testConsumerStatsLastTimestamp() throws PulsarClientException, PulsarAdminException, InterruptedException { |
| long timestamp = System.currentTimeMillis(); |
| final String topicName = "consumer-stats-" + timestamp; |
| final String subscribeName = topicName + "-test-stats-sub"; |
| final String topic = "persistent://prop-xyz/ns1/" + topicName; |
| final String producerName = "producer-" + topicName; |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); |
| Producer<byte[]> producer = client.newProducer().topic(topic) |
| .enableBatching(false) |
| .producerName(producerName) |
| .create(); |
| |
| // a. Send a message to the topic. |
| producer.send("message-1".getBytes(StandardCharsets.UTF_8)); |
| |
| // b. Create a consumer, because there was a message in the topic, the consumer will receive the message pushed |
| // by the broker, the lastConsumedTimestamp will as the consume subscribe time. |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic) |
| .subscriptionName(subscribeName) |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .acknowledgmentGroupTime(0, TimeUnit.SECONDS) |
| .subscribe(); |
| Message<byte[]> message = consumer.receive(); |
| |
| // Get the consumer stats. |
| TopicStats topicStats = admin.topics().getStats(topic); |
| SubscriptionStats subscriptionStats = topicStats.subscriptions.get(subscribeName); |
| long startConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; |
| long startAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; |
| ConsumerStats consumerStats = subscriptionStats.consumers.get(0); |
| long startConsumedTimestampInConsumerStats = consumerStats.lastConsumedTimestamp; |
| long startAckedTimestampInConsumerStats = consumerStats.lastAckedTimestamp; |
| |
| // Because the message was pushed by the broker, the consumedTimestamp should not as 0. |
| assertNotEquals(0, startConsumedTimestampInConsumerStats); |
| // There is no consumer ack the message, so the lastAckedTimestamp still as 0. |
| assertEquals(0, startAckedTimestampInConsumerStats); |
| assertNotEquals(0, startConsumedFlowTimestamp); |
| assertEquals(0, startAckedTimestampInSubStats); |
| |
| // c. The Consumer receives the message and acks the message. |
| consumer.acknowledge(message); |
| // Waiting for the ack command send to the broker. |
| while (true) { |
| topicStats = admin.topics().getStats(topic); |
| if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != 0) { |
| break; |
| } |
| TimeUnit.MILLISECONDS.sleep(100); |
| } |
| |
| // Get the consumer stats. |
| topicStats = admin.topics().getStats(topic); |
| subscriptionStats = topicStats.subscriptions.get(subscribeName); |
| long consumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; |
| long ackedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; |
| consumerStats = subscriptionStats.consumers.get(0); |
| long consumedTimestamp = consumerStats.lastConsumedTimestamp; |
| long ackedTimestamp = consumerStats.lastAckedTimestamp; |
| |
| // The lastConsumedTimestamp should same as the last time because the broker does not push any messages and the |
| // consumer does not pull any messages. |
| assertEquals(startConsumedTimestampInConsumerStats, consumedTimestamp); |
| assertTrue(startAckedTimestampInConsumerStats < ackedTimestamp); |
| assertNotEquals(0, consumedFlowTimestamp); |
| assertTrue(startAckedTimestampInSubStats < ackedTimestampInSubStats); |
| |
| // d. Send another messages. The lastConsumedTimestamp should be updated. |
| producer.send("message-2".getBytes(StandardCharsets.UTF_8)); |
| |
| // e. Receive the message and ack it. |
| message = consumer.receive(); |
| consumer.acknowledge(message); |
| // Waiting for the ack command send to the broker. |
| while (true) { |
| topicStats = admin.topics().getStats(topic); |
| if (topicStats.subscriptions.get(subscribeName).lastAckedTimestamp != ackedTimestampInSubStats) { |
| break; |
| } |
| TimeUnit.MILLISECONDS.sleep(100); |
| } |
| |
| // Get the consumer stats again. |
| topicStats = admin.topics().getStats(topic); |
| subscriptionStats = topicStats.subscriptions.get(subscribeName); |
| long lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp; |
| long lastConsumedTimestampInSubStats = subscriptionStats.lastConsumedTimestamp; |
| long lastAckedTimestampInSubStats = subscriptionStats.lastAckedTimestamp; |
| consumerStats = subscriptionStats.consumers.get(0); |
| long lastConsumedTimestamp = consumerStats.lastConsumedTimestamp; |
| long lastAckedTimestamp = consumerStats.lastAckedTimestamp; |
| |
| assertTrue(consumedTimestamp < lastConsumedTimestamp); |
| assertTrue(ackedTimestamp < lastAckedTimestamp); |
| assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp); |
| assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp); |
| assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats); |
| assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats); |
| |
| consumer.close(); |
| producer.close(); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testPreciseBacklog() throws PulsarClientException, PulsarAdminException, InterruptedException { |
| final String topic = "persistent://prop-xyz/ns1/precise-back-log"; |
| final String subName = "sub-name"; |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); |
| |
| @Cleanup |
| Consumer<byte[]> consumer = client.newConsumer() |
| .topic(topic) |
| .subscriptionName(subName) |
| .acknowledgmentGroupTime(0, TimeUnit.SECONDS) |
| .subscribe(); |
| |
| @Cleanup |
| Producer<byte[]> producer = client.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| producer.send("message-1".getBytes(StandardCharsets.UTF_8)); |
| Message<byte[]> message = consumer.receive(); |
| assertNotNull(message); |
| |
| // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count |
| // Since message have not acked, so the backlog is 10 |
| PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName); |
| assertNotNull(subscription); |
| ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L); |
| TopicStats topicStats = admin.topics().getStats(topic); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10); |
| |
| topicStats = admin.topics().getStats(topic, true, true); |
| assertEquals(topicStats.subscriptions.get(subName).backlogSize, 43); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1); |
| consumer.acknowledge(message); |
| |
| // wait for ack send |
| Thread.sleep(500); |
| |
| // Consumer acks the message, so the precise backlog is 0 |
| topicStats = admin.topics().getStats(topic, true, true); |
| assertEquals(topicStats.subscriptions.get(subName).backlogSize, 0); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0); |
| |
| topicStats = admin.topics().getStats(topic); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 9); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException { |
| final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed-" + UUID.randomUUID().toString(); |
| final String subName = "sub-name"; |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); |
| |
| @Cleanup |
| Consumer<byte[]> consumer = client.newConsumer() |
| .topic(topic) |
| .subscriptionName(subName) |
| .subscriptionType(SubscriptionType.Shared) |
| .acknowledgmentGroupTime(0, TimeUnit.SECONDS) |
| .subscribe(); |
| |
| @Cleanup |
| Producer<byte[]> producer = client.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| for (int i = 0; i < 10; i++) { |
| if (i > 4) { |
| producer.newMessage() |
| .value("message-1".getBytes(StandardCharsets.UTF_8)) |
| .deliverAfter(10, TimeUnit.SECONDS) |
| .send(); |
| } else { |
| producer.send("message-1".getBytes(StandardCharsets.UTF_8)); |
| } |
| } |
| |
| // Wait for messages to be tracked for delayed delivery. This happens |
| // on the consumer dispatch side, so when the send() is complete we're |
| // not yet guaranteed to see the stats updated. |
| Thread.sleep(500); |
| |
| TopicStats topicStats = admin.topics().getStats(topic, true, true); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5); |
| |
| for (int i = 0; i < 5; i++) { |
| consumer.acknowledge(consumer.receive()); |
| } |
| // Wait the ack send. |
| Thread.sleep(500); |
| topicStats = admin.topics().getStats(topic, true, true); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0); |
| } |
| |
| @Test |
| public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException, PulsarAdminException { |
| final String topic = "persistent://prop-xyz/ns1/precise-back-log-for-partitioned-topic"; |
| admin.topics().createPartitionedTopic(topic, 2); |
| final String subName = "sub-name"; |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); |
| |
| @Cleanup |
| Consumer<byte[]> consumer = client.newConsumer() |
| .topic(topic) |
| .subscriptionName(subName) |
| .acknowledgmentGroupTime(0, TimeUnit.SECONDS) |
| .subscribe(); |
| |
| @Cleanup |
| Producer<byte[]> producer = client.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| producer.send("message-1".getBytes(StandardCharsets.UTF_8)); |
| Message<byte[]> message = consumer.receive(); |
| assertNotNull(message); |
| |
| // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count |
| // Since message have not acked, so the backlog is 10 |
| for (int i = 0; i < 2; i++) { |
| PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic + "-partition-" + i).get().getSubscription(subName); |
| assertNotNull(subscription); |
| ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L); |
| } |
| |
| TopicStats topicStats = admin.topics().getPartitionedStats(topic, false); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 20); |
| |
| topicStats = admin.topics().getPartitionedStats(topic, false, true, true); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1); |
| assertEquals(topicStats.subscriptions.get(subName).backlogSize, 43); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientException, PulsarAdminException, InterruptedException { |
| final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed-partitioned-topic"; |
| admin.topics().createPartitionedTopic(topic, 2); |
| final String subName = "sub-name"; |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); |
| |
| @Cleanup |
| Consumer<byte[]> consumer = client.newConsumer() |
| .topic(topic) |
| .subscriptionName(subName) |
| .subscriptionType(SubscriptionType.Shared) |
| .acknowledgmentGroupTime(0, TimeUnit.SECONDS) |
| .subscribe(); |
| |
| @Cleanup |
| Producer<byte[]> producer = client.newProducer() |
| .topic(topic) |
| .enableBatching(false) |
| .create(); |
| |
| for (int i = 0; i < 10; i++) { |
| if (i > 4) { |
| producer.newMessage() |
| .value("message-1".getBytes(StandardCharsets.UTF_8)) |
| .deliverAfter(10, TimeUnit.SECONDS) |
| .send(); |
| } else { |
| producer.send("message-1".getBytes(StandardCharsets.UTF_8)); |
| } |
| } |
| |
| TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10); |
| assertEquals(topicStats.subscriptions.get(subName).backlogSize, 470); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5); |
| |
| for (int i = 0; i < 5; i++) { |
| consumer.acknowledge(consumer.receive()); |
| } |
| // Wait the ack send. |
| Thread.sleep(500); |
| topicStats = admin.topics().getPartitionedStats(topic, false, true, true); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5); |
| assertEquals(topicStats.subscriptions.get(subName).backlogSize, 238); |
| assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0); |
| } |
| |
| @Test |
| public void testMaxNumPartitionsPerPartitionedTopicSuccess() { |
| final String topic = "persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-success"; |
| pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(3); |
| |
| try { |
| admin.topics().createPartitionedTopic(topic, 2); |
| } catch (Exception e) { |
| fail("should not throw any exceptions"); |
| } |
| |
| // reset configuration |
| pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0); |
| } |
| |
| @Test |
| public void testMaxNumPartitionsPerPartitionedTopicFailure() { |
| final String topic = "persistent://prop-xyz/ns1/max-num-partitions-per-partitioned-topic-failure"; |
| pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(2); |
| |
| try { |
| admin.topics().createPartitionedTopic(topic, 3); |
| fail("should throw exception when number of partitions exceed than max partitions"); |
| } catch (Exception e) { |
| assertTrue(e instanceof PulsarAdminException); |
| } |
| |
| // reset configuration |
| pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0); |
| } |
| |
| @Test |
| public void testListOfNamespaceBundles() throws Exception { |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("prop-xyz2", tenantInfo); |
| admin.namespaces().createNamespace("prop-xyz2/ns1", 10); |
| admin.namespaces().setNamespaceReplicationClusters("prop-xyz2/ns1", Sets.newHashSet("test")); |
| admin.namespaces().createNamespace("prop-xyz2/test/ns2", 10); |
| assertEquals(admin.namespaces().getBundles("prop-xyz2/ns1").numBundles, 10); |
| assertEquals(admin.namespaces().getBundles("prop-xyz2/test/ns2").numBundles, 10); |
| } |
| |
| @Test |
| public void testUpdateClusterWithProxyUrl() throws Exception { |
| ClusterData cluster = new ClusterData(pulsar.getWebServiceAddress()); |
| String clusterName = "test2"; |
| admin.clusters().createCluster(clusterName, cluster); |
| Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster); |
| |
| // update |
| cluster.setProxyServiceUrl("proxy"); |
| cluster.setProxyProtocol(ProxyProtocol.SNI); |
| admin.clusters().updateCluster(clusterName, cluster); |
| Assert.assertEquals(admin.clusters().getCluster(clusterName), cluster); |
| } |
| |
| @Test |
| public void testMaxNamespacesPerTenant() throws Exception { |
| super.internalCleanup(); |
| conf.setMaxNamespacesPerTenant(2); |
| super.internalSetup(); |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| admin.namespaces().createNamespace("testTenant/ns2", Sets.newHashSet("test")); |
| try { |
| admin.namespaces().createNamespace("testTenant/ns3", Sets.newHashSet("test")); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| Assert.assertEquals(e.getHttpError(), "Exceed the maximum number of namespace in tenant :testTenant"); |
| } |
| |
| //unlimited |
| super.internalCleanup(); |
| conf.setMaxNamespacesPerTenant(0); |
| super.internalSetup(); |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| for (int i = 0; i < 10; i++) { |
| admin.namespaces().createNamespace("testTenant/ns-" + i, Sets.newHashSet("test")); |
| } |
| } |
| |
| @Test |
| public void testMaxTopicsPerNamespace() throws Exception { |
| super.internalCleanup(); |
| conf.setMaxTopicsPerNamespace(10); |
| super.internalSetup(); |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| |
| // check create partitioned/non-partitioned topics |
| String topic = "persistent://testTenant/ns1/test_create_topic_v"; |
| admin.topics().createPartitionedTopic(topic + "1", 2); |
| admin.topics().createPartitionedTopic(topic + "2", 3); |
| admin.topics().createPartitionedTopic(topic + "3", 4); |
| admin.topics().createNonPartitionedTopic(topic + "4"); |
| try { |
| admin.topics().createPartitionedTopic(topic + "5", 2); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| Assert.assertEquals(e.getStatusCode(), 412); |
| Assert.assertEquals(e.getHttpError(), "Exceed maximum number of topics in namespace."); |
| } |
| |
| //unlimited |
| super.internalCleanup(); |
| conf.setMaxTopicsPerNamespace(0); |
| super.internalSetup(); |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| for (int i = 0; i < 10; ++i) { |
| admin.topics().createPartitionedTopic(topic + i, 2); |
| admin.topics().createNonPartitionedTopic(topic + i + i); |
| } |
| |
| // check producer/consumer auto create partitioned topic |
| super.internalCleanup(); |
| conf.setMaxTopicsPerNamespace(10); |
| conf.setDefaultNumPartitions(3); |
| conf.setAllowAutoTopicCreationType("partitioned"); |
| super.internalSetup(); |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| |
| pulsarClient.newProducer().topic(topic + "1").create().close(); |
| pulsarClient.newProducer().topic(topic + "2").create().close(); |
| pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close(); |
| try { |
| pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe().close(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Exception: ", e); |
| } |
| |
| // check producer/consumer auto create non-partitioned topic |
| super.internalCleanup(); |
| conf.setMaxTopicsPerNamespace(3); |
| conf.setAllowAutoTopicCreationType("non-partitioned"); |
| super.internalSetup(); |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| |
| pulsarClient.newProducer().topic(topic + "1").create().close(); |
| pulsarClient.newProducer().topic(topic + "2").create().close(); |
| pulsarClient.newConsumer().topic(topic + "3").subscriptionName("test_sub").subscribe().close(); |
| try { |
| pulsarClient.newConsumer().topic(topic + "4").subscriptionName("test_sub").subscribe().close(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("Exception: ", e); |
| } |
| |
| // reset configuration |
| conf.setMaxTopicsPerNamespace(0); |
| conf.setDefaultNumPartitions(1); |
| } |
| |
| @Test |
| public void testInvalidBundleErrorResponse() throws Exception { |
| try { |
| admin.namespaces().deleteNamespaceBundle("prop-xyz/ns1", "invalid-bundle"); |
| fail("should have failed due to invalid bundle"); |
| } catch (PreconditionFailedException e) { |
| assertTrue(e.getMessage().startsWith("Invalid bundle range")); |
| } |
| } |
| |
| @Test |
| public void testMaxSubscriptionsPerTopic() throws Exception { |
| super.internalCleanup(); |
| conf.setMaxSubscriptionsPerTopic(2); |
| super.internalSetup(); |
| |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| |
| final String topic = "persistent://testTenant/ns1/max-subscriptions-per-topic"; |
| |
| admin.topics().createPartitionedTopic(topic, 3); |
| Producer producer = pulsarClient.newProducer().topic(topic).create(); |
| producer.close(); |
| |
| // create subscription |
| admin.topics().createSubscription(topic, "test-sub1", MessageId.earliest); |
| admin.topics().createSubscription(topic, "test-sub2", MessageId.earliest); |
| try { |
| admin.topics().createSubscription(topic, "test-sub3", MessageId.earliest); |
| Assert.fail(); |
| } catch (PulsarAdminException e) { |
| log.info("create subscription failed. Exception: ", e); |
| } |
| |
| super.internalCleanup(); |
| conf.setMaxSubscriptionsPerTopic(0); |
| super.internalSetup(); |
| |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| |
| admin.topics().createPartitionedTopic(topic, 3); |
| producer = pulsarClient.newProducer().topic(topic).create(); |
| producer.close(); |
| |
| for (int i = 0; i < 10; ++i) { |
| admin.topics().createSubscription(topic, "test-sub" + i, MessageId.earliest); |
| } |
| |
| super.internalCleanup(); |
| conf.setMaxSubscriptionsPerTopic(2); |
| super.internalSetup(); |
| |
| admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString())); |
| admin.tenants().createTenant("testTenant", tenantInfo); |
| admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); |
| |
| admin.topics().createPartitionedTopic(topic, 3); |
| producer = pulsarClient.newProducer().topic(topic).create(); |
| producer.close(); |
| |
| Consumer consumer1 = null; |
| Consumer consumer2 = null; |
| Consumer consumer3 = null; |
| |
| try { |
| consumer1 = pulsarClient.newConsumer().subscriptionName("test-sub1").topic(topic).subscribe(); |
| Assert.assertNotNull(consumer1); |
| } catch (PulsarClientException e) { |
| Assert.fail(); |
| } |
| |
| try { |
| consumer2 = pulsarClient.newConsumer().subscriptionName("test-sub2").topic(topic).subscribe(); |
| Assert.assertNotNull(consumer2); |
| } catch (PulsarClientException e) { |
| Assert.fail(); |
| } |
| |
| try { |
| consumer3 = pulsarClient.newConsumer().subscriptionName("test-sub3").topic(topic).subscribe(); |
| Assert.fail(); |
| } catch (PulsarClientException e) { |
| log.info("subscription reached max subscriptions per topic"); |
| } |
| |
| consumer1.close(); |
| consumer2.close(); |
| admin.topics().deletePartitionedTopic(topic); |
| } |
| |
| @Test(timeOut = 30000) |
| public void testMaxSubPerTopicApi() throws Exception { |
| final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); |
| admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); |
| |
| assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace)); |
| |
| admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,100); |
| assertEquals(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace).intValue(),100); |
| admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); |
| assertNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace)); |
| |
| admin.namespaces().setMaxSubscriptionsPerTopicAsync(myNamespace,200).get(); |
| assertEquals(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get().intValue(),200); |
| admin.namespaces().removeMaxSubscriptionsPerTopicAsync(myNamespace); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() |
| -> assertNull(admin.namespaces().getMaxSubscriptionsPerTopicAsync(myNamespace).get())); |
| |
| try { |
| admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace,-100); |
| fail("should fail"); |
| } catch (PulsarAdminException ignore) { |
| } |
| } |
| |
| @Test(timeOut = 30000) |
| public void testMaxSubPerTopic() throws Exception { |
| final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); |
| admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); |
| final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic"; |
| pulsarClient.newProducer().topic(topic).create().close(); |
| final int maxSub = 2; |
| admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub); |
| PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); |
| Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); |
| field.setAccessible(true); |
| Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == maxSub); |
| |
| List<Consumer<?>> consumerList = new ArrayList<>(maxSub); |
| for (int i = 0; i < maxSub; i++) { |
| Consumer<?> consumer = |
| pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); |
| consumerList.add(consumer); |
| } |
| //Create a client that can fail quickly |
| try (PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS) |
| .serviceUrl(brokerUrl.toString()).build()){ |
| client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); |
| fail("should fail"); |
| } catch (Exception ignore) { |
| } |
| //After removing the restriction, it should be able to create normally |
| admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); |
| Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null); |
| Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) |
| .subscribe(); |
| consumerList.add(consumer); |
| |
| for (Consumer<?> c : consumerList) { |
| c.close(); |
| } |
| } |
| |
| @Test(timeOut = 30000) |
| public void testMaxSubPerTopicPriority() throws Exception { |
| final int brokerLevelMaxSub = 2; |
| super.internalCleanup(); |
| mockPulsarSetup.cleanup(); |
| conf.setMaxSubscriptionsPerTopic(brokerLevelMaxSub); |
| super.internalSetup(); |
| |
| admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("prop-xyz", tenantInfo); |
| final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); |
| admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); |
| final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic"; |
| //Create a client that can fail quickly |
| PulsarClient client = PulsarClient.builder().operationTimeout(2,TimeUnit.SECONDS) |
| .serviceUrl(brokerUrl.toString()).build(); |
| //We can only create 2 consumers |
| List<Consumer<?>> consumerList = new ArrayList<>(brokerLevelMaxSub); |
| for (int i = 0; i < brokerLevelMaxSub; i++) { |
| Consumer<?> consumer = |
| pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); |
| consumerList.add(consumer); |
| } |
| try { |
| client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); |
| fail("should fail"); |
| } catch (Exception ignore) { |
| |
| } |
| //Set namespace-level policy,the limit should up to 4 |
| final int nsLevelMaxSub = 4; |
| admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub); |
| PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); |
| Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); |
| field.setAccessible(true); |
| Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub); |
| Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) |
| .subscribe(); |
| consumerList.add(consumer); |
| assertEquals(consumerList.size(), 3); |
| //After removing the restriction, it should fail again |
| admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); |
| Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null); |
| try { |
| client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); |
| fail("should fail"); |
| } catch (Exception ignore) { |
| |
| } |
| |
| for (Consumer<?> c : consumerList) { |
| c.close(); |
| } |
| client.close(); |
| } |
| |
| @Test |
| public void testMaxProducersPerTopicUnlimited() throws Exception { |
| final int maxProducersPerTopic = 1; |
| super.internalCleanup(); |
| mockPulsarSetup.cleanup(); |
| conf.setMaxProducersPerTopic(maxProducersPerTopic); |
| super.internalSetup(); |
| //init namespace |
| admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("prop-xyz", tenantInfo); |
| final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); |
| admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); |
| final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited"; |
| //the policy is set to 0, so there will be no restrictions |
| admin.namespaces().setMaxProducersPerTopic(myNamespace, 0); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() |
| -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0); |
| List<Producer<byte[]>> producers = new ArrayList<>(); |
| for (int i = 0; i < maxProducersPerTopic + 1; i++) { |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| producers.add(producer); |
| } |
| |
| admin.namespaces().removeMaxProducersPerTopic(myNamespace); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() |
| -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null); |
| try { |
| pulsarClient.newProducer().topic(topic).create(); |
| fail("should fail"); |
| } catch (PulsarClientException e) { |
| assertTrue(e.getMessage().contains("Topic reached max producers limit")); |
| } |
| //set the limit to 3 |
| admin.namespaces().setMaxProducersPerTopic(myNamespace, 3); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() |
| -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3); |
| // should success |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| producers.add(producer); |
| try { |
| pulsarClient.newProducer().topic(topic).create(); |
| fail("should fail"); |
| } catch (PulsarClientException e) { |
| assertTrue(e.getMessage().contains("Topic reached max producers limit")); |
| } |
| |
| //clean up |
| for (Producer<byte[]> tempProducer : producers) { |
| tempProducer.close(); |
| } |
| } |
| |
| @Test |
| public void testMaxConsumersPerTopicUnlimited() throws Exception { |
| final int maxConsumersPerTopic = 1; |
| super.internalCleanup(); |
| mockPulsarSetup.cleanup(); |
| conf.setMaxConsumersPerTopic(maxConsumersPerTopic); |
| super.internalSetup(); |
| //init namespace |
| admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress())); |
| TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); |
| admin.tenants().createTenant("prop-xyz", tenantInfo); |
| final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); |
| admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); |
| final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited"; |
| |
| assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)); |
| //the policy is set to 0, so there will be no restrictions |
| admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() |
| -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0); |
| List<Consumer<byte[]>> consumers = new ArrayList<>(); |
| for (int i = 0; i < maxConsumersPerTopic + 1; i++) { |
| Consumer<byte[]> consumer = |
| pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); |
| consumers.add(consumer); |
| } |
| |
| admin.namespaces().removeMaxConsumersPerTopic(myNamespace); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() |
| -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null); |
| try { |
| pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException e) { |
| assertTrue(e.getMessage().contains("Topic reached max consumers limit")); |
| } |
| //set the limit to 3 |
| admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3); |
| Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() |
| -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3); |
| // should success |
| Consumer<byte[]> consumer = |
| pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); |
| consumers.add(consumer); |
| try { |
| pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); |
| fail("should fail"); |
| } catch (PulsarClientException e) { |
| assertTrue(e.getMessage().contains("Topic reached max consumers limit")); |
| } |
| |
| //clean up |
| for (Consumer<byte[]> subConsumer : consumers) { |
| subConsumer.close(); |
| } |
| } |
| |
| @Test |
| public void testClearBacklogForTheSubscriptionThatNoConsumers() throws Exception { |
| final String topic = "persistent://prop-xyz/ns1/clear_backlog_no_consumers" + UUID.randomUUID().toString(); |
| final String sub = "my-sub"; |
| admin.topics().createNonPartitionedTopic(topic); |
| admin.topics().createSubscription(topic, sub, MessageId.earliest); |
| admin.topics().skipAllMessages(topic, sub); |
| } |
| |
| @Test(timeOut = 200000) |
| public void testCompactionApi() throws Exception { |
| final String namespace = "prop-xyz/ns1"; |
| assertNull(admin.namespaces().getCompactionThreshold(namespace)); |
| assertEquals(pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes(), 0); |
| |
| admin.namespaces().setCompactionThreshold(namespace, 10); |
| Awaitility.await().untilAsserted(() -> |
| assertNotNull(admin.namespaces().getCompactionThreshold(namespace))); |
| assertEquals(admin.namespaces().getCompactionThreshold(namespace).intValue(), 10); |
| |
| admin.namespaces().removeCompactionThreshold(namespace); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getCompactionThreshold(namespace))); |
| } |
| |
| @Test(timeOut = 200000) |
| public void testCompactionPriority() throws Exception { |
| cleanup(); |
| conf.setSystemTopicEnabled(true); |
| conf.setTopicLevelPoliciesEnabled(true); |
| conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10000); |
| setup(); |
| final String topic = "persistent://prop-xyz/ns1/topic" + UUID.randomUUID(); |
| final String namespace = "prop-xyz/ns1"; |
| pulsarClient.newProducer().topic(topic).create().close(); |
| TopicName topicName = TopicName.get(topic); |
| Awaitility.await().until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)); |
| PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); |
| PersistentTopic mockTopic = spy(persistentTopic); |
| mockTopic.checkCompaction(); |
| // Disabled by default |
| verify(mockTopic, times(0)).triggerCompaction(); |
| // Set namespace-level policy |
| admin.namespaces().setCompactionThreshold(namespace, 1); |
| Awaitility.await().untilAsserted(() -> |
| assertNotNull(admin.namespaces().getCompactionThreshold(namespace))); |
| ManagedLedger managedLedger = persistentTopic.getManagedLedger(); |
| Field field = managedLedger.getClass().getDeclaredField("totalSize"); |
| field.setAccessible(true); |
| field.setLong(managedLedger, 1000L); |
| |
| mockTopic.checkCompaction(); |
| verify(mockTopic, times(1)).triggerCompaction(); |
| //Set topic-level policy |
| admin.topics().setCompactionThreshold(topic, 0); |
| Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getCompactionThreshold(topic))); |
| mockTopic.checkCompaction(); |
| verify(mockTopic, times(1)).triggerCompaction(); |
| // Remove topic-level policy |
| admin.topics().removeCompactionThreshold(topic); |
| Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getCompactionThreshold(topic))); |
| mockTopic.checkCompaction(); |
| verify(mockTopic, times(2)).triggerCompaction(); |
| // Remove namespace-level policy |
| admin.namespaces().removeCompactionThreshold(namespace); |
| Awaitility.await().untilAsserted(() -> |
| assertNull(admin.namespaces().getCompactionThreshold(namespace))); |
| mockTopic.checkCompaction(); |
| verify(mockTopic, times(2)).triggerCompaction(); |
| } |
| |
| } |