| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.api; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertNotEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertThrows; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| import com.google.common.collect.Sets; |
| import java.lang.reflect.Field; |
| import java.net.URL; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import lombok.Cleanup; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.loadbalance.LoadManager; |
| import org.apache.pulsar.broker.loadbalance.ResourceUnit; |
| import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; |
| import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; |
| import org.apache.pulsar.broker.service.BrokerService; |
| import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator; |
| import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.impl.ConsumerImpl; |
| import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; |
| import org.apache.pulsar.client.impl.PartitionedProducerImpl; |
| import org.apache.pulsar.client.impl.ProducerImpl; |
| import org.apache.pulsar.common.naming.NamespaceBundle; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats; |
| import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats; |
| import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; |
| import org.apache.pulsar.common.policies.data.SubscriptionStats; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; |
| import org.apache.pulsar.zookeeper.ZookeeperServerTest; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.DataProvider; |
| import org.testng.annotations.Test; |
| |
| @Test(groups = "broker-api") |
| public class NonPersistentTopicTest extends ProducerConsumerBase { |
| private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class); |
| private final String configClusterName = "r1"; |
| |
| @DataProvider(name = "subscriptionType") |
| public Object[][] getSubscriptionType() { |
| return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Exclusive } }; |
| } |
| |
| @DataProvider(name = "loadManager") |
| public Object[][] getLoadManager() { |
| return new Object[][] { { SimpleLoadManagerImpl.class.getCanonicalName() }, |
| { ModularLoadManagerImpl.class.getCanonicalName() } }; |
| } |
| |
| @BeforeMethod |
| @Override |
| protected void setup() throws Exception { |
| super.internalSetup(); |
| super.producerBaseSetup(); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| @Override |
| protected void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| @Test(timeOut = 90000 /* 1.5mn */) |
| public void testNonPersistentPartitionsAreNotAutoCreatedWhenThePartitionedTopicDoesNotExist() throws Exception { |
| final boolean defaultAllowAutoTopicCreation = conf.isAllowAutoTopicCreation(); |
| try { |
| // Given the auto topic creation is disabled |
| cleanup(); |
| conf.setAllowAutoTopicCreation(false); |
| setup(); |
| |
| final String topicPartitionName = "non-persistent://public/default/issue-9173-partition-0"; |
| |
| // Then error when subscribe to a partition of a non-persistent topic that does not exist |
| assertThrows(PulsarClientException.NotFoundException.class, |
| () -> pulsarClient.newConsumer().topic(topicPartitionName).subscriptionName("sub-issue-9173").subscribe()); |
| |
| // Then error when produce to a partition of a non-persistent topic that does not exist |
| try { |
| pulsarClient.newProducer().topic(topicPartitionName).create(); |
| Assert.fail("Should failed due to topic not exist"); |
| } catch (Exception e) { |
| assertTrue(e instanceof PulsarClientException.NotFoundException); |
| } |
| } finally { |
| conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation); |
| } |
| } |
| |
| @Test(timeOut = 90000 /* 1.5mn */) |
| public void testAutoCreateNonPersistentPartitionsWhenThePartitionedTopicExists() throws Exception { |
| final boolean defaultAllowAutoTopicCreation = conf.isAllowAutoTopicCreation(); |
| try { |
| // Given the auto topic creation is disabled |
| cleanup(); |
| conf.setAllowAutoTopicCreation(false); |
| setup(); |
| |
| // Given the non-persistent partitioned topic exists |
| final String topic = "non-persistent://public/default/issue-9173"; |
| admin.topics().createPartitionedTopic(topic, 3); |
| |
| // When subscribe, then a sub-consumer is created for each partition which means the partitions are created |
| final MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer() |
| .topic(topic).subscriptionName("sub-issue-9173").subscribe(); |
| assertEquals(consumer.getConsumers().size(), 3); |
| |
| // When produce, a sub-producer is created for each partition which means the partitions are created |
| PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic).create(); |
| assertEquals(producer.getProducers().size(), 3); |
| |
| consumer.close(); |
| producer.close(); |
| } finally { |
| conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation); |
| } |
| } |
| |
| @Test(dataProvider = "subscriptionType") |
| public void testNonPersistentTopic(SubscriptionType type) throws Exception { |
| log.info("-- Starting {} test --", methodName); |
| |
| final String topic = "non-persistent://my-property/my-ns/unacked-topic"; |
| ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("subscriber-1").subscriptionType(type).subscribe(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| |
| int totalProduceMsg = 500; |
| for (int i = 0; i < totalProduceMsg; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| producer.flush(); |
| |
| Message<?> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer.receive(1, TimeUnit.SECONDS); |
| if (msg != null) { |
| consumer.acknowledge(msg); |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProduceMsg); |
| |
| producer.close(); |
| consumer.close(); |
| log.info("-- Exiting {} test --", methodName); |
| |
| } |
| |
| @Test(dataProvider = "subscriptionType") |
| public void testPartitionedNonPersistentTopic(SubscriptionType type) throws Exception { |
| log.info("-- Starting {} test --", methodName); |
| |
| final String topic = "non-persistent://my-property/my-ns/partitioned-topic"; |
| admin.topics().createPartitionedTopic(topic, 5); |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1") |
| .subscriptionType(type).subscribe(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| int totalProduceMsg = 500; |
| for (int i = 0; i < totalProduceMsg; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| producer.flush(); |
| |
| Message<?> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer.receive(1, TimeUnit.SECONDS); |
| if (msg != null) { |
| consumer.acknowledge(msg); |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProduceMsg); |
| |
| producer.close(); |
| consumer.close(); |
| log.info("-- Exiting {} test --", methodName); |
| |
| } |
| |
| @Test(dataProvider = "subscriptionType") |
| public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws Exception { |
| log.info("-- Starting {} test --", methodName); |
| |
| final int numPartitions = 5; |
| final String topic = "non-persistent://my-property/my-ns/partitioned-topic"; |
| admin.topics().createPartitionedTopic(topic, numPartitions); |
| |
| @Cleanup |
| PulsarClient client = PulsarClient.builder() |
| .serviceUrl(pulsar.getBrokerServiceUrl()) |
| .statsInterval(0, TimeUnit.SECONDS) |
| .build(); |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1") |
| .subscriptionType(type).subscribe(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| // Ensure all partitions exist |
| for (int i = 0; i < numPartitions; i++) { |
| TopicName partition = TopicName.get(topic).getPartition(i); |
| assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString())); |
| } |
| |
| int totalProduceMsg = 500; |
| for (int i = 0; i < totalProduceMsg; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| producer.flush(); |
| |
| Message<?> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer.receive(1, TimeUnit.SECONDS); |
| if (msg != null) { |
| consumer.acknowledge(msg); |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProduceMsg); |
| |
| producer.close(); |
| consumer.close(); |
| log.info("-- Exiting {} test --", methodName); |
| } |
| |
| /** |
| * It verifies that broker doesn't dispatch messages if consumer runs out of permits filled out with messages |
| */ |
| @Test(dataProvider = "subscriptionType") |
| public void testConsumerInternalQueueMaxOut(SubscriptionType type) throws Exception { |
| log.info("-- Starting {} test --", methodName); |
| |
| final String topic = "non-persistent://my-property/my-ns/unacked-topic"; |
| final int queueSize = 10; |
| ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic) |
| .receiverQueueSize(queueSize).subscriptionName("subscriber-1").subscriptionType(type).subscribe(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| |
| int totalProduceMsg = 50; |
| for (int i = 0; i < totalProduceMsg; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| producer.flush(); |
| |
| Message<?> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer.receive(1, TimeUnit.SECONDS); |
| if (msg != null) { |
| consumer.acknowledge(msg); |
| String receivedMessage = new String(msg.getData()); |
| log.debug("Received message: [{}]", receivedMessage); |
| String expectedMessage = "my-message-" + i; |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), queueSize); |
| |
| producer.close(); |
| consumer.close(); |
| log.info("-- Exiting {} test --", methodName); |
| |
| } |
| |
| /** |
| * Verifies that broker should failed to publish message if producer publishes messages more than rate limit |
| */ |
| @Test |
| public void testProducerRateLimit() throws Exception { |
| int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection(); |
| try { |
| final String topic = "non-persistent://my-property/my-ns/unacked-topic"; |
| // restart broker with lower publish rate limit |
| conf.setMaxConcurrentNonPersistentMessagePerConnection(1); |
| stopBroker(); |
| startBroker(); |
| // produce message concurrently |
| @Cleanup("shutdownNow") |
| ExecutorService executor = Executors.newFixedThreadPool(5); |
| AtomicBoolean failed = new AtomicBoolean(false); |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1") |
| .subscribe(); |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| byte[] msgData = "testData".getBytes(); |
| final int totalProduceMessages = 10; |
| CountDownLatch latch = new CountDownLatch(totalProduceMessages); |
| for (int i = 0; i < totalProduceMessages; i++) { |
| executor.submit(() -> { |
| try { |
| producer.send(msgData); |
| } catch (Exception e) { |
| log.error("Failed to send message", e); |
| failed.set(true); |
| } |
| latch.countDown(); |
| }); |
| } |
| latch.await(); |
| |
| Message<?> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < totalProduceMessages; i++) { |
| msg = consumer.receive(500, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| messageSet.add(new String(msg.getData())); |
| } else { |
| break; |
| } |
| } |
| |
| // publish should not be failed |
| assertFalse(failed.get()); |
| // but as message should be dropped at broker: broker should not receive the message |
| assertNotEquals(messageSet.size(), totalProduceMessages); |
| |
| producer.close(); |
| } finally { |
| conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate); |
| } |
| } |
| |
| /** |
| * verifies message delivery with multiple consumers on shared and failover subscriptions |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testMultipleSubscription() throws Exception { |
| log.info("-- Starting {} test --", methodName); |
| |
| final String topic = "non-persistent://my-property/my-ns/unacked-topic"; |
| ConsumerImpl<byte[]> consumer1Shared = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe(); |
| |
| ConsumerImpl<byte[]> consumer2Shared = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe(); |
| |
| ConsumerImpl<byte[]> consumer1FailOver = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe(); |
| |
| ConsumerImpl<byte[]> consumer2FailOver = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic) |
| .subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe(); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); |
| |
| int totalProduceMsg = 500; |
| for (int i = 0; i < totalProduceMsg; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| producer.flush(); |
| |
| // consume from shared-subscriptions |
| Message<?> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer1Shared.receive(500, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| messageSet.add(new String(msg.getData())); |
| } else { |
| break; |
| } |
| } |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer2Shared.receive(500, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| messageSet.add(new String(msg.getData())); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProduceMsg); |
| |
| // consume from failover-subscriptions |
| messageSet.clear(); |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer1FailOver.receive(500, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| messageSet.add(new String(msg.getData())); |
| } else { |
| break; |
| } |
| } |
| for (int i = 0; i < totalProduceMsg; i++) { |
| msg = consumer2FailOver.receive(500, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| messageSet.add(new String(msg.getData())); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProduceMsg); |
| |
| producer.close(); |
| consumer1Shared.close(); |
| consumer2Shared.close(); |
| consumer1FailOver.close(); |
| consumer2FailOver.close(); |
| log.info("-- Exiting {} test --", methodName); |
| |
| } |
| |
| /** |
| * verifies that broker is capturing topic stats correctly |
| */ |
| @Test |
| public void testTopicStats() throws Exception { |
| |
| final String topicName = "non-persistent://my-property/my-ns/unacked-topic"; |
| final String subName = "non-persistent"; |
| final int timeWaitToSync = 100; |
| |
| NonPersistentTopicStats stats; |
| SubscriptionStats subStats; |
| |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) |
| .subscriptionType(SubscriptionType.Shared).subscriptionName(subName).subscribe(); |
| Thread.sleep(timeWaitToSync); |
| |
| NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); |
| assertNotNull(topicRef); |
| |
| rolloverPerIntervalStats(pulsar); |
| stats = topicRef.getStats(false, false, false); |
| subStats = stats.getSubscriptions().values().iterator().next(); |
| |
| // subscription stats |
| assertEquals(stats.getSubscriptions().keySet().size(), 1); |
| assertEquals(subStats.getConsumers().size(), 1); |
| |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); |
| Thread.sleep(timeWaitToSync); |
| |
| int totalProducedMessages = 100; |
| for (int i = 0; i < totalProducedMessages; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| Thread.sleep(timeWaitToSync); |
| |
| rolloverPerIntervalStats(pulsar); |
| stats = topicRef.getStats(false, false, false); |
| subStats = stats.getSubscriptions().values().iterator().next(); |
| |
| assertTrue(subStats.getMsgRateOut() > 0); |
| assertEquals(subStats.getConsumers().size(), 1); |
| assertTrue(subStats.getMsgThroughputOut() > 0); |
| |
| // consumer stats |
| assertTrue(subStats.getConsumers().get(0).getMsgRateOut() > 0.0); |
| assertTrue(subStats.getConsumers().get(0).getMsgThroughputOut() > 0.0); |
| assertEquals(subStats.getMsgRateRedeliver(), 0.0); |
| producer.close(); |
| consumer.close(); |
| |
| } |
| |
| /** |
| * verifies that non-persistent topic replicates using replicator |
| */ |
| @Test |
| public void testReplicator() throws Exception { |
| |
| ReplicationClusterManager replication = new ReplicationClusterManager(); |
| replication.setupReplicationCluster(); |
| try { |
| final String globalTopicName = "non-persistent://pulsar/global/ns/nonPersistentTopic"; |
| final int timeWaitToSync = 100; |
| |
| NonPersistentTopicStats stats; |
| SubscriptionStats subStats; |
| |
| @Cleanup |
| PulsarClient client1 = PulsarClient.builder().serviceUrl(replication.url1.toString()).build(); |
| @Cleanup |
| PulsarClient client2 = PulsarClient.builder().serviceUrl(replication.url2.toString()).build(); |
| @Cleanup |
| PulsarClient client3 = PulsarClient.builder().serviceUrl(replication.url3.toString()).build(); |
| |
| ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) client1.newConsumer().topic(globalTopicName) |
| .subscriptionName("subscriber-1").subscribe(); |
| ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) client1.newConsumer().topic(globalTopicName) |
| .subscriptionName("subscriber-2").subscribe(); |
| |
| ConsumerImpl<byte[]> repl2Consumer = (ConsumerImpl<byte[]>) client2.newConsumer().topic(globalTopicName) |
| .subscriptionName("subscriber-1").subscribe(); |
| ConsumerImpl<byte[]> repl3Consumer = (ConsumerImpl<byte[]>) client3.newConsumer().topic(globalTopicName) |
| .subscriptionName("subscriber-1").subscribe(); |
| |
| Producer<byte[]> producer = client1.newProducer().topic(globalTopicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| |
| Thread.sleep(timeWaitToSync); |
| |
| PulsarService replicationPulasr = replication.pulsar1; |
| |
| // Replicator for r1 -> r2,r3 |
| NonPersistentTopic topicRef = (NonPersistentTopic) replication.pulsar1.getBrokerService() |
| .getTopicReference(globalTopicName).get(); |
| NonPersistentReplicator replicatorR2 = (NonPersistentReplicator) topicRef.getPersistentReplicator("r2"); |
| NonPersistentReplicator replicatorR3 = (NonPersistentReplicator) topicRef.getPersistentReplicator("r3"); |
| assertNotNull(topicRef); |
| assertNotNull(replicatorR2); |
| assertNotNull(replicatorR3); |
| |
| rolloverPerIntervalStats(replicationPulasr); |
| stats = topicRef.getStats(false, false, false); |
| subStats = stats.getSubscriptions().values().iterator().next(); |
| |
| // subscription stats |
| assertEquals(stats.getSubscriptions().keySet().size(), 2); |
| assertEquals(subStats.getConsumers().size(), 1); |
| |
| Thread.sleep(timeWaitToSync); |
| |
| int totalProducedMessages = 100; |
| for (int i = 0; i < totalProducedMessages; i++) { |
| String message = "my-message-" + i; |
| producer.send(message.getBytes()); |
| } |
| |
| // (1) consume by consumer1 |
| Message<?> msg = null; |
| Set<String> messageSet = Sets.newHashSet(); |
| for (int i = 0; i < totalProducedMessages; i++) { |
| msg = consumer1.receive(300, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| String receivedMessage = new String(msg.getData()); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProducedMessages); |
| |
| // (2) consume by consumer2 |
| messageSet.clear(); |
| for (int i = 0; i < totalProducedMessages; i++) { |
| msg = consumer2.receive(300, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| String receivedMessage = new String(msg.getData()); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProducedMessages); |
| |
| // (3) consume by repl2consumer |
| messageSet.clear(); |
| for (int i = 0; i < totalProducedMessages; i++) { |
| msg = repl2Consumer.receive(300, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| String receivedMessage = new String(msg.getData()); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProducedMessages); |
| |
| // (4) consume by repl3consumer |
| messageSet.clear(); |
| for (int i = 0; i < totalProducedMessages; i++) { |
| msg = repl3Consumer.receive(300, TimeUnit.MILLISECONDS); |
| if (msg != null) { |
| String receivedMessage = new String(msg.getData()); |
| testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i); |
| } else { |
| break; |
| } |
| } |
| assertEquals(messageSet.size(), totalProducedMessages); |
| |
| Thread.sleep(timeWaitToSync); |
| |
| rolloverPerIntervalStats(replicationPulasr); |
| stats = topicRef.getStats(false, false, false); |
| subStats = stats.getSubscriptions().values().iterator().next(); |
| |
| assertTrue(subStats.getMsgRateOut() > 0); |
| assertEquals(subStats.getConsumers().size(), 1); |
| assertTrue(subStats.getMsgThroughputOut() > 0); |
| |
| // consumer stats |
| assertTrue(subStats.getConsumers().get(0).getMsgRateOut() > 0.0); |
| assertTrue(subStats.getConsumers().get(0).getMsgThroughputOut() > 0.0); |
| assertEquals(subStats.getMsgRateRedeliver(), 0.0); |
| |
| producer.close(); |
| consumer1.close(); |
| repl2Consumer.close(); |
| repl3Consumer.close(); |
| } finally { |
| replication.shutdownReplicationCluster(); |
| } |
| |
| } |
| |
| /** |
| * verifies load manager assigns topic only if broker started in non-persistent mode |
| * |
| * <pre> |
| * 1. Start broker with disable non-persistent topic mode |
| * 2. Create namespace with non-persistency set |
| * 3. Create non-persistent topic |
| * 4. Load-manager should not be able to find broker |
| * 5. Create producer on that topic should fail |
| * </pre> |
| */ |
| @Test(dataProvider = "loadManager") |
| public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadManagerName) throws Exception { |
| |
| final String namespace = "my-property/my-ns"; |
| final String topicName = "non-persistent://" + namespace + "/loadManager"; |
| final String defaultLoadManagerName = conf.getLoadManagerClassName(); |
| final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics(); |
| try { |
| // start broker to not own non-persistent namespace and create non-persistent namespace |
| stopBroker(); |
| conf.setEnableNonPersistentTopics(false); |
| conf.setLoadManagerClassName(loadManagerName); |
| startBroker(); |
| |
| Field field = PulsarService.class.getDeclaredField("loadManager"); |
| field.setAccessible(true); |
| @SuppressWarnings("unchecked") |
| AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar); |
| LoadManager manager = LoadManager.create(pulsar); |
| manager.start(); |
| LoadManager oldLoadManager = loadManagerRef.getAndSet(manager); |
| oldLoadManager.stop(); |
| |
| NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName)); |
| LoadManager loadManager = pulsar.getLoadManager().get(); |
| ResourceUnit broker = null; |
| try { |
| broker = loadManager.getLeastLoaded(fdqn).get(); |
| } catch (Exception e) { |
| // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker) |
| } |
| assertNull(broker); |
| |
| try { |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1, |
| TimeUnit.SECONDS); |
| producer.close(); |
| fail("topic loading should have failed"); |
| } catch (Exception e) { |
| // Ok |
| } |
| assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); |
| |
| } finally { |
| conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic); |
| conf.setLoadManagerClassName(defaultLoadManagerName); |
| } |
| |
| } |
| |
| /** |
| * verifies: broker should reject non-persistent topic loading if broker is not enable for non-persistent topic |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testNonPersistentTopicUnderPersistentNamespace() throws Exception { |
| |
| final String namespace = "my-property/my-ns"; |
| final String topicName = "non-persistent://" + namespace + "/persitentNamespace"; |
| |
| final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics(); |
| try { |
| conf.setEnableNonPersistentTopics(false); |
| stopBroker(); |
| startBroker(); |
| try { |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1, |
| TimeUnit.SECONDS); |
| producer.close(); |
| fail("topic loading should have failed"); |
| } catch (Exception e) { |
| // Ok |
| } |
| |
| assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); |
| } finally { |
| conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic); |
| } |
| } |
| |
| /** |
| * verifies that broker started with onlyNonPersistent mode doesn't own persistent-topic |
| * |
| * @param loadManagerName |
| * @throws Exception |
| */ |
| @Test(dataProvider = "loadManager") |
| public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerName) throws Exception { |
| |
| final String namespace = "my-property/my-ns"; |
| final String topicName = "persistent://" + namespace + "/loadManager"; |
| final String defaultLoadManagerName = conf.getLoadManagerClassName(); |
| final boolean defaultEnablePersistentTopic = conf.isEnablePersistentTopics(); |
| final boolean defaultEnableNonPersistentTopic = conf.isEnableNonPersistentTopics(); |
| try { |
| // start broker to not own non-persistent namespace and create non-persistent namespace |
| stopBroker(); |
| conf.setEnableNonPersistentTopics(true); |
| conf.setEnablePersistentTopics(false); |
| conf.setLoadManagerClassName(loadManagerName); |
| startBroker(); |
| |
| Field field = PulsarService.class.getDeclaredField("loadManager"); |
| field.setAccessible(true); |
| @SuppressWarnings("unchecked") |
| AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar); |
| LoadManager manager = LoadManager.create(pulsar); |
| manager.start(); |
| LoadManager oldLoadManager = loadManagerRef.getAndSet(manager); |
| oldLoadManager.stop(); |
| |
| NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName)); |
| LoadManager loadManager = pulsar.getLoadManager().get(); |
| ResourceUnit broker = null; |
| try { |
| broker = loadManager.getLeastLoaded(fdqn).get(); |
| } catch (Exception e) { |
| // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker) |
| } |
| assertNull(broker); |
| |
| try { |
| Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1, |
| TimeUnit.SECONDS); |
| producer.close(); |
| fail("topic loading should have failed"); |
| } catch (Exception e) { |
| // Ok |
| } |
| |
| assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); |
| |
| } finally { |
| conf.setEnablePersistentTopics(defaultEnablePersistentTopic); |
| conf.setEnableNonPersistentTopics(defaultEnableNonPersistentTopic); |
| conf.setLoadManagerClassName(defaultLoadManagerName); |
| } |
| |
| } |
| |
| /** |
| * Verifies msg-drop stats |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testMsgDropStat() throws Exception { |
| |
| int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection(); |
| try { |
| final String topicName = "non-persistent://my-property/my-ns/stats-topic"; |
| // restart broker with lower publish rate limit |
| conf.setMaxConcurrentNonPersistentMessagePerConnection(1); |
| stopBroker(); |
| startBroker(); |
| Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1") |
| .receiverQueueSize(1).subscribe(); |
| |
| Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2") |
| .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe(); |
| |
| ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) |
| .enableBatching(false) |
| .messageRoutingMode(MessageRoutingMode.SinglePartition) |
| .create(); |
| @Cleanup("shutdownNow") |
| ExecutorService executor = Executors.newFixedThreadPool(5); |
| byte[] msgData = "testData".getBytes(); |
| final int totalProduceMessages = 200; |
| CountDownLatch latch = new CountDownLatch(totalProduceMessages); |
| for (int i = 0; i < totalProduceMessages; i++) { |
| executor.submit(() -> { |
| producer.sendAsync(msgData).handle((msg, e) -> { |
| latch.countDown(); |
| return null; |
| }); |
| }); |
| } |
| latch.await(); |
| |
| NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); |
| pulsar.getBrokerService().updateRates(); |
| NonPersistentTopicStats stats = topic.getStats(false, false, false); |
| NonPersistentPublisherStats npStats = stats.getPublishers().get(0); |
| NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1"); |
| NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2"); |
| assertTrue(npStats.getMsgDropRate() > 0); |
| assertTrue(sub1Stats.getMsgDropRate() > 0); |
| assertTrue(sub2Stats.getMsgDropRate() > 0); |
| |
| producer.close(); |
| consumer.close(); |
| consumer2.close(); |
| } finally { |
| conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate); |
| } |
| } |
| |
| class ReplicationClusterManager { |
| URL url1; |
| PulsarService pulsar1; |
| BrokerService ns1; |
| |
| PulsarAdmin admin1; |
| LocalBookkeeperEnsemble bkEnsemble1; |
| |
| URL url2; |
| ServiceConfiguration config2; |
| PulsarService pulsar2; |
| BrokerService ns2; |
| PulsarAdmin admin2; |
| LocalBookkeeperEnsemble bkEnsemble2; |
| |
| URL url3; |
| ServiceConfiguration config3; |
| PulsarService pulsar3; |
| BrokerService ns3; |
| PulsarAdmin admin3; |
| LocalBookkeeperEnsemble bkEnsemble3; |
| |
| ZookeeperServerTest globalZkS; |
| |
| ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<Runnable>()); |
| |
| static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5; |
| |
| // Default frequency |
| public int getBrokerServicePurgeInactiveFrequency() { |
| return 60; |
| } |
| |
| public boolean isBrokerServicePurgeInactiveTopic() { |
| return false; |
| } |
| |
| void setupReplicationCluster() throws Exception { |
| log.info("--- Starting ReplicatorTestBase::setup ---"); |
| globalZkS = new ZookeeperServerTest(0); |
| globalZkS.start(); |
| |
| // Start region 1 |
| bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0); |
| bkEnsemble1.start(); |
| |
| // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have |
| // completely |
| // independent config objects instead of referring to the same properties object |
| ServiceConfiguration config1 = new ServiceConfiguration(); |
| config1.setClusterName(configClusterName); |
| config1.setAdvertisedAddress("localhost"); |
| config1.setWebServicePort(Optional.of(0)); |
| config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort()); |
| config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); |
| config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); |
| config1.setBrokerDeleteInactiveTopicsFrequencySeconds( |
| inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); |
| config1.setBrokerShutdownTimeoutMs(0L); |
| config1.setBrokerServicePort(Optional.of(0)); |
| config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); |
| config1.setAllowAutoTopicCreationType("non-partitioned"); |
| pulsar1 = new PulsarService(config1); |
| pulsar1.start(); |
| ns1 = pulsar1.getBrokerService(); |
| |
| url1 = new URL(pulsar1.getWebServiceAddress()); |
| admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build(); |
| |
| // Start region 2 |
| |
| // Start zk & bks |
| bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); |
| bkEnsemble2.start(); |
| |
| config2 = new ServiceConfiguration(); |
| config2.setClusterName("r2"); |
| config2.setWebServicePort(Optional.of(0)); |
| config2.setAdvertisedAddress("localhost"); |
| config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort()); |
| config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); |
| config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); |
| config2.setBrokerDeleteInactiveTopicsFrequencySeconds( |
| inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); |
| config2.setBrokerShutdownTimeoutMs(0L); |
| config2.setBrokerServicePort(Optional.of(0)); |
| config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); |
| config2.setAllowAutoTopicCreationType("non-partitioned"); |
| pulsar2 = new PulsarService(config2); |
| pulsar2.start(); |
| ns2 = pulsar2.getBrokerService(); |
| |
| url2 = new URL(pulsar2.getWebServiceAddress()); |
| admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); |
| |
| // Start region 3 |
| |
| // Start zk & bks |
| bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0); |
| bkEnsemble3.start(); |
| |
| config3 = new ServiceConfiguration(); |
| config3.setClusterName("r3"); |
| config3.setWebServicePort(Optional.of(0)); |
| config3.setAdvertisedAddress("localhost"); |
| config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort()); |
| config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); |
| config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); |
| config3.setBrokerDeleteInactiveTopicsFrequencySeconds( |
| inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); |
| config3.setBrokerShutdownTimeoutMs(0L); |
| config3.setBrokerServicePort(Optional.of(0)); |
| config3.setAllowAutoTopicCreationType("non-partitioned"); |
| pulsar3 = new PulsarService(config3); |
| pulsar3.start(); |
| ns3 = pulsar3.getBrokerService(); |
| |
| url3 = new URL(pulsar3.getWebServiceAddress()); |
| admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build(); |
| |
| // Provision the global namespace |
| admin1.clusters().createCluster("r1", ClusterData.builder() |
| .serviceUrl(url1.toString()) |
| .brokerServiceUrl(pulsar1.getBrokerServiceUrl()) |
| .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()) |
| .build()); |
| admin1.clusters().createCluster("r2", ClusterData.builder() |
| .serviceUrl(url2.toString()) |
| .brokerServiceUrl(pulsar2.getBrokerServiceUrl()) |
| .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()) |
| .build()); |
| admin1.clusters().createCluster("r3", ClusterData.builder() |
| .serviceUrl(url3.toString()) |
| .brokerServiceUrl(pulsar3.getBrokerServiceUrl()) |
| .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()) |
| .build()); |
| |
| admin1.clusters().createCluster("global", ClusterData.builder().serviceUrl("http://global:8080").build()); |
| admin1.tenants().createTenant("pulsar", new TenantInfoImpl( |
| Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3"))); |
| admin1.namespaces().createNamespace("pulsar/global/ns"); |
| admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", |
| Sets.newHashSet("r1", "r2", "r3")); |
| |
| assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString()); |
| assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString()); |
| assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString()); |
| assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl()); |
| assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()); |
| assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl()); |
| Thread.sleep(100); |
| log.info("--- ReplicatorTestBase::setup completed ---"); |
| |
| } |
| |
| private int inSec(int time, TimeUnit unit) { |
| return (int) TimeUnit.SECONDS.convert(time, unit); |
| } |
| |
| void shutdownReplicationCluster() throws Exception { |
| log.info("--- Shutting down ---"); |
| executor.shutdownNow(); |
| |
| admin1.close(); |
| admin2.close(); |
| admin3.close(); |
| |
| pulsar3.close(); |
| ns3.close(); |
| |
| pulsar2.close(); |
| ns2.close(); |
| |
| pulsar1.close(); |
| ns1.close(); |
| |
| bkEnsemble1.stop(); |
| bkEnsemble2.stop(); |
| bkEnsemble3.stop(); |
| globalZkS.stop(); |
| } |
| } |
| |
| private void rolloverPerIntervalStats(PulsarService pulsar) { |
| try { |
| pulsar.getExecutor().submit(() -> pulsar.getBrokerService().updateRates()).get(); |
| } catch (Exception e) { |
| log.error("Stats executor error", e); |
| } |
| } |
| } |