blob: 9e65eecb8eea4d429b06000f48fd447f8fd3f92b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.awaitility.Awaitility.await;
@Test(groups = "flaky")
public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest {
private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
/**
* verify: consumer should not receive all messages due to message-rate throttling
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptionAndDispatchRateType", timeOut = 5000)
public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscription,
DispatchRateType dispatchRateType) throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/throttlingBlock";
final String subName = "my-subscriber-name";
final int messageRate = 100;
DispatchRate dispatchRate = null;
if (DispatchRateType.messageRate.equals(dispatchRateType)) {
dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(messageRate)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(360)
.build();
} else {
dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(messageRate)
.ratePeriodInSecond(360)
.build();
}
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
// create producer, topic and consumer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
final AtomicInteger totalReceived = new AtomicInteger(0);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
}).subscribe();
DispatchRateLimiter subRateLimiter = null;
Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
boolean isMessageRateUpdate = false;
int retry = 5;
for (int i = 0; i < retry; i++) {
if (subRateLimiter.getDispatchRateOnMsg() > 0
|| subRateLimiter.getDispatchRateOnByte() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);
int numMessages = 500;
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
producer.send(new byte[80]);
}
// consumer should not have received all published message due to message-rate throttling
Assert.assertTrue(totalReceived.get() < messageRate * 2);
consumer.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* verify rate-limiting should throttle message-dispatching based on message-rate
*
* <pre>
* 1. dispatch-msg-rate = 10 msg/sec
* 2. send 30 msgs
* 3. it should take up to 2 second to receive all messages
* </pre>
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptions", timeOut = 5000)
public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription)
throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/throttlingAll";
final String subName = "my-subscriber-name";
final int messageRate = 10;
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(messageRate)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(1)
.build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
final int numProducedMessages = 30;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
final AtomicInteger totalReceived = new AtomicInteger(0);
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
latch.countDown();
}).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
DispatchRateLimiter subRateLimiter = null;
Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
boolean isMessageRateUpdate = false;
int retry = 5;
for (int i = 0; i < retry; i++) {
if (subRateLimiter.getDispatchRateOnMsg() > 0
|| subRateLimiter.getDispatchRateOnByte() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);
// Asynchronously produce messages
for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
await().until(() -> latch.getCount() == 0);
Assert.assertEquals(totalReceived.get(), numProducedMessages);
consumer.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* verify rate-limiting should throttle message-dispatching based on byte-rate
*
* <pre>
* 1. dispatch-byte-rate = 1000 bytes/sec
* 2. send 30 msgs : each with 100 byte
* 3. it should take up to 2 second to receive all messages
* </pre>
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptions", timeOut = 5000)
public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;
final int byteRate = 1000;
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(byteRate)
.ratePeriodInSecond(1)
.build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
final int numProducedMessages = 30;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
final AtomicInteger totalReceived = new AtomicInteger(0);
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
latch.countDown();
}).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
DispatchRateLimiter subRateLimiter = null;
Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
boolean isMessageRateUpdate = false;
int retry = 5;
for (int i = 0; i < retry; i++) {
if (subRateLimiter.getDispatchRateOnMsg() > 0
|| subRateLimiter.getDispatchRateOnByte() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);
long start = System.currentTimeMillis();
// Asynchronously produce messages
for (int i = 0; i < numProducedMessages; i++) {
producer.send(new byte[byteRate / 10]);
}
latch.await();
Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
long end = System.currentTimeMillis();
log.info("-- end - start: {} ", end - start);
// first 10 messages, which equals receiverQueueSize, will not wait.
Assert.assertTrue((end - start) >= 2000);
consumer.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}
private void testDispatchRate(SubscriptionType subscription,
int brokerRate, int topicRate, int subRate, int expectRate) throws Exception {
final String namespace = "my-property/throttling_ns";
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;
DispatchRate subscriptionDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(subRate)
.ratePeriodInSecond(1)
.build();
DispatchRate topicDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(topicRate)
.ratePeriodInSecond(1)
.build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
final int numProducedMessages = 30;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
final AtomicInteger totalReceived = new AtomicInteger(0);
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
latch.countDown();
}).subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
DispatchRateLimiter subRateLimiter = null;
Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
Assert.assertTrue(brokerDispatchRateLimiter != null
&& brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
Assert.assertTrue(topicDispatchRateLimiter != null
&& topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
Assert.assertTrue(subDispatchRateLimiter != null
&& subDispatchRateLimiter.getDispatchRateOnByte() > 0);
});
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
.getDispatchThrottlingRateInByte(), subRate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
.getDispatchThrottlingRateInByte(), topicRate);
long start = System.currentTimeMillis();
// Asynchronously produce messages
for (int i = 0; i < numProducedMessages; i++) {
producer.send(new byte[expectRate / 10]);
}
latch.await();
Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
long end = System.currentTimeMillis();
log.info("-- end - start: {} ", end - start);
// first 10 messages, which equals receiverQueueSize, will not wait.
Assert.assertTrue((end - start) >= 2500);
Assert.assertTrue((end - start) <= 8000);
consumer.close();
producer.close();
admin.topics().delete(topicName, true);
admin.namespaces().deleteNamespace(namespace);
}
/**
* Verify whether rate-limiting works well when different levels rate-limiting enabled.
*
* <pre>
* 1. Set broker level, topic level and subscription level dispatch-byte-rate with different limit rate value.
* 2. Start one consumer for one topics.
* 3. the expect dispatch rate should be the minimum value of different limit rate.
* </pre>
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptions")
public void testMultiLevelDispatch(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
testDispatchRate(subscription, 1000, 5000, 10000, 1000);
testDispatchRate(subscription, 10000, 1000, 5000, 1000);
testDispatchRate(subscription, 5000, 10000, 1000, 1000);
log.info("-- Exiting {} test --", methodName);
}
/**
* Verify whether the broker level rate-limiting is throttle message-dispatching based on byte-rate or not
*
* <pre>
* 1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
* 2. Start two consumers for two topics.
* 3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, thus 3000 bytes in total.
* 4. It should take up to 2 seconds to receive all messages of the two topics.
* </pre>
*
* @param subscription
* @throws Exception
*/
@Test(dataProvider = "subscriptions", timeOut = 8000)
public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace1 = "my-property/throttling_ns1";
final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/throttlingAll");
final String namespace2 = "my-property/throttling_ns2";
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;
final int byteRate = 1000;
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate);
admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
admin.namespaces().createNamespace(namespace2, Sets.newHashSet("test"));
final int numProducedMessagesEachTopic = 15;
final int numProducedMessages = numProducedMessagesEachTopic * 2;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
final AtomicInteger totalReceived = new AtomicInteger(0);
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in topic1", receivedMessage);
totalReceived.incrementAndGet();
latch.countDown();
}).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in topic2", receivedMessage);
totalReceived.incrementAndGet();
latch.countDown();
}).subscribe();
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1).create();
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create();
boolean isMessageRateUpdate = false;
DispatchRateLimiter dispatchRateLimiter;
Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
DispatchRateLimiter rateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
Assert.assertTrue(rateLimiter != null
&& rateLimiter.getDispatchRateOnByte() > 0);
});
long start = System.currentTimeMillis();
// Asynchronously produce messages
for (int i = 0; i < numProducedMessagesEachTopic; i++) {
producer1.send(new byte[byteRate / 10]);
producer2.send(new byte[byteRate / 10]);
}
latch.await();
Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
long end = System.currentTimeMillis();
log.info("-- time to receive all messages: {} ", end - start);
// first 10 messages, which equals receiverQueueSize, will not wait.
Assert.assertTrue((end - start) >= 2000);
consumer1.close();
consumer2.close();
producer1.close();
producer2.close();
log.info("-- Exiting {} test --", methodName);
}
/**
* verify message-rate on multiple consumers with shared-subscription
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testRateLimitingMultipleConsumers() throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers";
final String subName = "my-subscriber-name";
final int messageRate = 5;
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(messageRate)
.dispatchThrottlingRateInByte(-1)
.ratePeriodInSecond(360)
.build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
final int numProducedMessages = 500;
final AtomicInteger totalReceived = new AtomicInteger(0);
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subName).subscriptionType(SubscriptionType.Shared).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
});
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
Consumer<byte[]> consumer5 = consumerBuilder.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
DispatchRateLimiter subRateLimiter = null;
Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
boolean isMessageRateUpdate = false;
int retry = 5;
for (int i = 0; i < retry; i++) {
if (subRateLimiter.getDispatchRateOnMsg() > 0
|| subRateLimiter.getDispatchRateOnByte() > 0) {
isMessageRateUpdate = true;
break;
} else {
if (i != retry - 1) {
Thread.sleep(100);
}
}
}
Assert.assertTrue(isMessageRateUpdate);
Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);
// Asynchronously produce messages
for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
// it can make sure that consumer had enough time to consume message but couldn't consume due to throttling
Thread.sleep(500);
// consumer should not have received all published message due to message-rate throttling
Assert.assertNotEquals(totalReceived.get(), numProducedMessages);
consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
consumer5.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "subscriptions", timeOut = 5000)
public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/throttlingBlock";
final String subName = "my-subscriber-name";
final int messageRate = 5;
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
// (1) Update message-dispatch-rate limit
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
Integer.toString(messageRate));
// sleep incrementally as zk-watch notification is async and may take some time
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg() == initValue) {
Thread.sleep(50 + (i * 10));
}
}
Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), initValue);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
// create producer and topic
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
int numMessages = 500;
final AtomicInteger totalReceived = new AtomicInteger(0);
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
}).subscribe();
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
// it can make sure that consumer had enough time to consume message but couldn't consume due to throttling
Thread.sleep(500);
// consumer should not have received all published message due to message-rate throttling
Assert.assertNotEquals(totalReceived.get(), numMessages);
consumer.close();
producer.close();
pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue);
log.info("-- Exiting {} test --", methodName);
}
/**
* <pre>
* It verifies that cluster-throttling value gets considered when namespace-policy throttling is disabled.
*
* 1. Update cluster-throttling-config: topic rate-limiter has cluster-config
* 2. Update namespace-throttling-config: topic rate-limiter has namespace-config
* 3. Disable namespace-throttling-config: topic rate-limiter has cluster-config
* 4. Create new topic with disable namespace-config and enabled cluster-config: it takes cluster-config
*
* </pre>
*
* @throws Exception
*/
@Test
public void testClusterPolicyOverrideConfiguration() throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName1 = "persistent://" + namespace + "/throttlingOverride1";
final String topicName2 = "persistent://" + namespace + "/throttlingOverride2";
final String subName1 = "my-subscriber-name1";
final String subName2 = "my-subscriber-name2";
final int clusterMessageRate = 100;
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
// (1) Update message-dispatch-rate limit
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
Integer.toString(clusterMessageRate));
// sleep incrementally as zk-watch notification is async and may take some time
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg() == initValue) {
Thread.sleep(50 + (i * 10));
}
}
Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), initValue);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
// create producer and topic
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName1).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName1).get();
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName1)
.subscribe();
DispatchRateLimiter subRateLimiter = null;
Dispatcher subDispatcher = topic.getSubscription(subName1).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
// (1) Update dispatch rate on cluster-config update
Assert.assertEquals(clusterMessageRate, subRateLimiter.getDispatchRateOnMsg());
// (2) Update namespace throttling limit
int nsMessageRate = 500;
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(nsMessageRate)
.dispatchThrottlingRateInByte(0)
.ratePeriodInSecond(1)
.build();
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
subRateLimiter = subDispatcher.getRateLimiter().get();
for (int i = 0; i < 5; i++) {
if (subRateLimiter.getDispatchRateOnMsg() != nsMessageRate) {
Thread.sleep(50 + (i * 10));
}
}
Assert.assertEquals(nsMessageRate, subRateLimiter.getDispatchRateOnMsg());
// (3) Disable namespace throttling limit will force to take cluster-config
dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(0)
.dispatchThrottlingRateInByte(0)
.ratePeriodInSecond(1)
.build();
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
for (int i = 0; i < 5; i++) {
if (subRateLimiter.getDispatchRateOnMsg() == nsMessageRate) {
Thread.sleep(50 + (i * 10));
}
}
Assert.assertEquals(clusterMessageRate, subRateLimiter.getDispatchRateOnMsg());
// (5) Namespace throttling is disabled so, new topic should take cluster throttling limit
Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create();
PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName2).get();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName2)
.subscribe();
subDispatcher = topic2.getSubscription(subName2).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
Assert.assertEquals(clusterMessageRate, subRateLimiter.getDispatchRateOnMsg());
producer.close();
producer2.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(dataProvider = "subscriptions", timeOut = 10000)
public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
final String namespace = "my-property/throttling_ns";
final String topicName = "persistent://" + namespace + "/closingSubRateLimiter" + subscription.name();
final String subName = "mySubscription" + subscription.name();
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(10)
.dispatchThrottlingRateInByte(1024)
.ratePeriodInSecond(1)
.build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(subscription).subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
PersistentSubscription sub = topic.getSubscription(subName);
final int numProducedMessages = 10;
for (int i = 0; i < numProducedMessages; i++) {
final String message = "my-message-" + i;
producer.send(message.getBytes());
}
for (int i = 0; i < numProducedMessages; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
}
Dispatcher dispatcher = sub.getDispatcher();
Assert.assertTrue(dispatcher.getRateLimiter().isPresent());
DispatchRateLimiter dispatchRateLimiter = dispatcher.getRateLimiter().get();
producer.close();
consumer.close();
sub.disconnect().get();
// Make sure that the rate limiter is closed
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
log.info("-- Exiting {} test --", methodName);
}
}