/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.awaitility.Awaitility.await;

@Test(groups = "flaky")
public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest {
    private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);

    /**
     * verify: consumer should not receive all messages due to message-rate throttling
     *
     * @param subscription
     * @throws Exception
     */
    @Test(dataProvider = "subscriptionAndDispatchRateType", timeOut = 5000)
    public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscription,
                                                             DispatchRateType dispatchRateType) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace = "my-property/throttling_ns";
        final String topicName = "persistent://" + namespace + "/throttlingBlock";
        final String subName = "my-subscriber-name";

        final int messageRate = 100;
        DispatchRate dispatchRate = null;
        if (DispatchRateType.messageRate.equals(dispatchRateType)) {
            dispatchRate = DispatchRate.builder()
                    .dispatchThrottlingRateInMsg(messageRate)
                    .dispatchThrottlingRateInByte(-1)
                    .ratePeriodInSecond(360)
                    .build();
        } else {
            dispatchRate = DispatchRate.builder()
                    .dispatchThrottlingRateInMsg(-1)
                    .dispatchThrottlingRateInByte(messageRate)
                    .ratePeriodInSecond(360)
                    .build();
        }

        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
        // create producer, topic and consumer
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        final AtomicInteger totalReceived = new AtomicInteger(0);

        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
            .subscriptionType(subscription).messageListener((c1, msg) -> {
                Assert.assertNotNull(msg, "Message cannot be null");
                String receivedMessage = new String(msg.getData());
                log.debug("Received message [{}] in the listener", receivedMessage);
                totalReceived.incrementAndGet();
            }).subscribe();

        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail("Should only have PersistentDispatcher in this test");
        }

        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; i++) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0
                || subRateLimiter.getDispatchRateOnByte() > 0) {
                isMessageRateUpdate = true;
                break;
            } else {
                if (i != retry - 1) {
                    Thread.sleep(100);
                }
            }
        }
        Assert.assertTrue(isMessageRateUpdate);
        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);

        int numMessages = 500;
        // Asynchronously produce messages
        for (int i = 0; i < numMessages; i++) {
            producer.send(new byte[80]);
        }

        // consumer should not have received all published message due to message-rate throttling
        Assert.assertTrue(totalReceived.get() < messageRate * 2);

        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", methodName);
    }

    /**
     * verify rate-limiting should throttle message-dispatching based on message-rate
     *
     * <pre>
     *  1. dispatch-msg-rate = 10 msg/sec
     *  2. send 30 msgs
     *  3. it should take up to 2 second to receive all messages
     * </pre>
     *
     * @param subscription
     * @throws Exception
     */
    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription)
        throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace = "my-property/throttling_ns";
        final String topicName = "persistent://" + namespace + "/throttlingAll";
        final String subName = "my-subscriber-name";

        final int messageRate = 10;
        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(messageRate)
                .dispatchThrottlingRateInByte(-1)
                .ratePeriodInSecond(1)
                .build();
        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
        final int numProducedMessages = 30;
        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
        final AtomicInteger totalReceived = new AtomicInteger(0);
        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
            .subscriptionType(subscription).messageListener((c1, msg) -> {
                Assert.assertNotNull(msg, "Message cannot be null");
                String receivedMessage = new String(msg.getData());
                log.debug("Received message [{}] in the listener", receivedMessage);
                totalReceived.incrementAndGet();
                latch.countDown();
            }).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail("Should only have PersistentDispatcher in this test");
        }

        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; i++) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0
                || subRateLimiter.getDispatchRateOnByte() > 0) {
                isMessageRateUpdate = true;
                break;
            } else {
                if (i != retry - 1) {
                    Thread.sleep(100);
                }
            }
        }
        Assert.assertTrue(isMessageRateUpdate);
        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);

        // Asynchronously produce messages
        for (int i = 0; i < numProducedMessages; i++) {
            final String message = "my-message-" + i;
            producer.send(message.getBytes());
        }
        await().until(() -> latch.getCount() == 0);
        Assert.assertEquals(totalReceived.get(), numProducedMessages);

        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", methodName);
    }

    /**
     * verify rate-limiting should throttle message-dispatching based on byte-rate
     *
     * <pre>
     *  1. dispatch-byte-rate = 1000 bytes/sec
     *  2. send 30 msgs : each with 100 byte
     *  3. it should take up to 2 second to receive all messages
     * </pre>
     *
     * @param subscription
     * @throws Exception
     */
    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace = "my-property/throttling_ns";
        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
        final String subName = "my-subscriber-name-" + subscription;

        final int byteRate = 1000;
        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(-1)
                .dispatchThrottlingRateInByte(byteRate)
                .ratePeriodInSecond(1)
                .build();
        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
        final int numProducedMessages = 30;
        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
        final AtomicInteger totalReceived = new AtomicInteger(0);
        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
            .receiverQueueSize(10)
            .subscriptionType(subscription).messageListener((c1, msg) -> {
                Assert.assertNotNull(msg, "Message cannot be null");
                String receivedMessage = new String(msg.getData());
                log.debug("Received message [{}] in the listener", receivedMessage);
                totalReceived.incrementAndGet();
                latch.countDown();
            }).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail("Should only have PersistentDispatcher in this test");
        }

        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; i++) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0
                || subRateLimiter.getDispatchRateOnByte() > 0) {
                isMessageRateUpdate = true;
                break;
            } else {
                if (i != retry - 1) {
                    Thread.sleep(100);
                }
            }
        }
        Assert.assertTrue(isMessageRateUpdate);
        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);

        long start = System.currentTimeMillis();
        // Asynchronously produce messages
        for (int i = 0; i < numProducedMessages; i++) {
            producer.send(new byte[byteRate / 10]);
        }
        latch.await();
        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
        long end = System.currentTimeMillis();
        log.info("-- end - start: {} ", end - start);

        // first 10 messages, which equals receiverQueueSize, will not wait.
        Assert.assertTrue((end - start) >= 2000);

        consumer.close();
        producer.close();
        log.info("-- Exiting {} test --", methodName);
    }

    private void testDispatchRate(SubscriptionType subscription,
                                  int brokerRate, int topicRate, int subRate, int expectRate) throws Exception {

        final String namespace = "my-property/throttling_ns";
        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
        final String subName = "my-subscriber-name-" + subscription;

        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(-1)
                .dispatchThrottlingRateInByte(subRate)
                .ratePeriodInSecond(1)
                .build();
        DispatchRate topicDispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(-1)
                .dispatchThrottlingRateInByte(topicRate)
                .ratePeriodInSecond(1)
                .build();
        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);

        final int numProducedMessages = 30;
        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
        final AtomicInteger totalReceived = new AtomicInteger(0);
        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                .receiverQueueSize(10)
                .subscriptionType(subscription).messageListener((c1, msg) -> {
                    Assert.assertNotNull(msg, "Message cannot be null");
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message [{}] in the listener", receivedMessage);
                    totalReceived.incrementAndGet();
                    latch.countDown();
                }).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail("Should only have PersistentDispatcher in this test");
        }
        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
            Assert.assertTrue(brokerDispatchRateLimiter != null
                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
            Assert.assertTrue(topicDispatchRateLimiter != null
                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
            Assert.assertTrue(subDispatchRateLimiter != null
                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
        });

        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
                .getDispatchThrottlingRateInByte(), subRate);
        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
                .getDispatchThrottlingRateInByte(), topicRate);

        long start = System.currentTimeMillis();
        // Asynchronously produce messages
        for (int i = 0; i < numProducedMessages; i++) {
            producer.send(new byte[expectRate / 10]);
        }
        latch.await();
        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
        long end = System.currentTimeMillis();
        log.info("-- end - start: {} ", end - start);

        // first 10 messages, which equals receiverQueueSize, will not wait.
        Assert.assertTrue((end - start) >= 2500);
        Assert.assertTrue((end - start) <= 8000);

        consumer.close();
        producer.close();
        admin.topics().delete(topicName, true);
        admin.namespaces().deleteNamespace(namespace);
    }

    /**
     * Verify whether rate-limiting works well when different levels rate-limiting enabled.
     *
     * <pre>
     *  1. Set broker level, topic level and subscription level dispatch-byte-rate with different limit rate value.
     *  2. Start one consumer for one topics.
     *  3. the expect dispatch rate should be the minimum value of different limit rate.
     * </pre>
     *
     * @param subscription
     * @throws Exception
     */
    @Test(dataProvider = "subscriptions")
    public void testMultiLevelDispatch(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", methodName);

        testDispatchRate(subscription, 1000, 5000, 10000, 1000);

        testDispatchRate(subscription, 10000, 1000, 5000, 1000);

        testDispatchRate(subscription, 5000, 10000, 1000, 1000);

        log.info("-- Exiting {} test --", methodName);
    }

    /**
     * Verify whether the broker level rate-limiting is throttle message-dispatching based on byte-rate or not
     *
     * <pre>
     *  1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
     *  2. Start two consumers for two topics.
     *  3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, thus 3000 bytes in total.
     *  4. It should take up to 2 seconds to receive all messages of the two topics.
     * </pre>
     *
     * @param subscription
     * @throws Exception
     */
    @Test(dataProvider = "subscriptions", timeOut = 8000)
    public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace1 = "my-property/throttling_ns1";
        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/throttlingAll");
        final String namespace2 = "my-property/throttling_ns2";
        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll");
        final String subName = "my-subscriber-name-" + subscription;

        final int byteRate = 1000;
        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate);
        admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
        admin.namespaces().createNamespace(namespace2, Sets.newHashSet("test"));

        final int numProducedMessagesEachTopic = 15;
        final int numProducedMessages = numProducedMessagesEachTopic * 2;
        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
        final AtomicInteger totalReceived = new AtomicInteger(0);
        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
                .receiverQueueSize(10)
                .subscriptionType(subscription).messageListener((c1, msg) -> {
                    Assert.assertNotNull(msg, "Message cannot be null");
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message [{}] in topic1", receivedMessage);
                    totalReceived.incrementAndGet();
                    latch.countDown();
                }).subscribe();

        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
                .receiverQueueSize(10)
                .subscriptionType(subscription).messageListener((c1, msg) -> {
                    Assert.assertNotNull(msg, "Message cannot be null");
                    String receivedMessage = new String(msg.getData());
                    log.debug("Received message [{}] in topic2", receivedMessage);
                    totalReceived.incrementAndGet();
                    latch.countDown();
                }).subscribe();

        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1).create();
        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create();

        boolean isMessageRateUpdate = false;
        DispatchRateLimiter dispatchRateLimiter;

        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
            DispatchRateLimiter rateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
            Assert.assertTrue(rateLimiter != null
                    && rateLimiter.getDispatchRateOnByte() > 0);
        });

        long start = System.currentTimeMillis();
        // Asynchronously produce messages
        for (int i = 0; i < numProducedMessagesEachTopic; i++) {
            producer1.send(new byte[byteRate / 10]);
            producer2.send(new byte[byteRate / 10]);
        }
        latch.await();
        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
        long end = System.currentTimeMillis();
        log.info("-- time to receive all messages: {} ", end - start);

        // first 10 messages, which equals receiverQueueSize, will not wait.
        Assert.assertTrue((end - start) >= 2000);

        consumer1.close();
        consumer2.close();
        producer1.close();
        producer2.close();
        log.info("-- Exiting {} test --", methodName);
    }

    /**
     * verify message-rate on multiple consumers with shared-subscription
     *
     * @throws Exception
     */
    @Test(timeOut = 5000)
    public void testRateLimitingMultipleConsumers() throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace = "my-property/throttling_ns";
        final String topicName = "persistent://" + namespace + "/throttlingMultipleConsumers";
        final String subName = "my-subscriber-name";

        final int messageRate = 5;
        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(messageRate)
                .dispatchThrottlingRateInByte(-1)
                .ratePeriodInSecond(360)
                .build();
        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);

        final int numProducedMessages = 500;
        final AtomicInteger totalReceived = new AtomicInteger(0);
        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
            .subscriptionName(subName).subscriptionType(SubscriptionType.Shared).messageListener((c1, msg) -> {
                Assert.assertNotNull(msg, "Message cannot be null");
                String receivedMessage = new String(msg.getData());
                log.debug("Received message [{}] in the listener", receivedMessage);
                totalReceived.incrementAndGet();
            });
        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
        Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
        Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
        Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
        Consumer<byte[]> consumer5 = consumerBuilder.subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail("Should only have PersistentDispatcher in this test");
        }

        boolean isMessageRateUpdate = false;
        int retry = 5;
        for (int i = 0; i < retry; i++) {
            if (subRateLimiter.getDispatchRateOnMsg() > 0
                || subRateLimiter.getDispatchRateOnByte() > 0) {
                isMessageRateUpdate = true;
                break;
            } else {
                if (i != retry - 1) {
                    Thread.sleep(100);
                }
            }
        }
        Assert.assertTrue(isMessageRateUpdate);
        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace), dispatchRate);

        // Asynchronously produce messages
        for (int i = 0; i < numProducedMessages; i++) {
            final String message = "my-message-" + i;
            producer.send(message.getBytes());
        }

        // it can make sure that consumer had enough time to consume message but couldn't consume due to throttling
        Thread.sleep(500);

        // consumer should not have received all published message due to message-rate throttling
        Assert.assertNotEquals(totalReceived.get(), numProducedMessages);

        consumer1.close();
        consumer2.close();
        consumer3.close();
        consumer4.close();
        consumer5.close();
        producer.close();
        log.info("-- Exiting {} test --", methodName);
    }


    @Test(dataProvider = "subscriptions", timeOut = 5000)
    public void testClusterRateLimitingConfiguration(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace = "my-property/throttling_ns";
        final String topicName = "persistent://" + namespace + "/throttlingBlock";
        final String subName = "my-subscriber-name";

        final int messageRate = 5;
        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
        // (1) Update message-dispatch-rate limit
        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
            Integer.toString(messageRate));
        // sleep incrementally as zk-watch notification is async and may take some time
        for (int i = 0; i < 5; i++) {
            if (pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg() == initValue) {
                Thread.sleep(50 + (i * 10));
            }
        }
        Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), initValue);

        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
        // create producer and topic
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
        int numMessages = 500;

        final AtomicInteger totalReceived = new AtomicInteger(0);

        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
            .subscriptionType(subscription).messageListener((c1, msg) -> {
                Assert.assertNotNull(msg, "Message cannot be null");
                String receivedMessage = new String(msg.getData());
                log.debug("Received message [{}] in the listener", receivedMessage);
                totalReceived.incrementAndGet();
            }).subscribe();

        // Asynchronously produce messages
        for (int i = 0; i < numMessages; i++) {
            final String message = "my-message-" + i;
            producer.send(message.getBytes());
        }

        // it can make sure that consumer had enough time to consume message but couldn't consume due to throttling
        Thread.sleep(500);

        // consumer should not have received all published message due to message-rate throttling
        Assert.assertNotEquals(totalReceived.get(), numMessages);

        consumer.close();
        producer.close();
        pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue);
        log.info("-- Exiting {} test --", methodName);
    }

    /**
     * <pre>
     * It verifies that cluster-throttling value gets considered when namespace-policy throttling is disabled.
     *
     *  1. Update cluster-throttling-config: topic rate-limiter has cluster-config
     *  2. Update namespace-throttling-config: topic rate-limiter has namespace-config
     *  3. Disable namespace-throttling-config: topic rate-limiter has cluster-config
     *  4. Create new topic with disable namespace-config and enabled cluster-config: it takes cluster-config
     *
     * </pre>
     *
     * @throws Exception
     */
    @Test
    public void testClusterPolicyOverrideConfiguration() throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace = "my-property/throttling_ns";
        final String topicName1 = "persistent://" + namespace + "/throttlingOverride1";
        final String topicName2 = "persistent://" + namespace + "/throttlingOverride2";
        final String subName1 = "my-subscriber-name1";
        final String subName2 = "my-subscriber-name2";

        final int clusterMessageRate = 100;
        // enable throttling for nonBacklog consumers
        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

        int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
        // (1) Update message-dispatch-rate limit
        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
            Integer.toString(clusterMessageRate));
        // sleep incrementally as zk-watch notification is async and may take some time
        for (int i = 0; i < 5; i++) {
            if (pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg() == initValue) {
                Thread.sleep(50 + (i * 10));
            }
        }
        Assert.assertNotEquals(pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), initValue);

        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));

        // create producer and topic
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName1).create();
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName1).get();

        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName1)
            .subscribe();

        DispatchRateLimiter subRateLimiter = null;
        Dispatcher subDispatcher = topic.getSubscription(subName1).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail("Should only have PersistentDispatcher in this test");
        }

        // (1) Update dispatch rate on cluster-config update
        Assert.assertEquals(clusterMessageRate, subRateLimiter.getDispatchRateOnMsg());

        // (2) Update namespace throttling limit
        int nsMessageRate = 500;
        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(nsMessageRate)
                .dispatchThrottlingRateInByte(0)
                .ratePeriodInSecond(1)
                .build();
        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);

        subRateLimiter = subDispatcher.getRateLimiter().get();

        for (int i = 0; i < 5; i++) {
            if (subRateLimiter.getDispatchRateOnMsg() != nsMessageRate) {
                Thread.sleep(50 + (i * 10));
            }
        }
        Assert.assertEquals(nsMessageRate, subRateLimiter.getDispatchRateOnMsg());

        // (3) Disable namespace throttling limit will force to take cluster-config
        dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(0)
                .dispatchThrottlingRateInByte(0)
                .ratePeriodInSecond(1)
                .build();
        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
        for (int i = 0; i < 5; i++) {
            if (subRateLimiter.getDispatchRateOnMsg() == nsMessageRate) {
                Thread.sleep(50 + (i * 10));
            }
        }
        Assert.assertEquals(clusterMessageRate, subRateLimiter.getDispatchRateOnMsg());

        // (5) Namespace throttling is disabled so, new topic should take cluster throttling limit
        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create();
        PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName2).get();
        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName2)
            .subscribe();

        subDispatcher = topic2.getSubscription(subName2).getDispatcher();
        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
            subRateLimiter = subDispatcher.getRateLimiter().get();
        } else {
            Assert.fail("Should only have PersistentDispatcher in this test");
        }

        Assert.assertEquals(clusterMessageRate, subRateLimiter.getDispatchRateOnMsg());

        producer.close();
        producer2.close();

        log.info("-- Exiting {} test --", methodName);
    }

    @Test(dataProvider = "subscriptions", timeOut = 10000)
    public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String namespace = "my-property/throttling_ns";
        final String topicName = "persistent://" + namespace + "/closingSubRateLimiter" + subscription.name();
        final String subName = "mySubscription" + subscription.name();

        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(10)
                .dispatchThrottlingRateInByte(1024)
                .ratePeriodInSecond(1)
                .build();
        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
        admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                .subscriptionType(subscription).subscribe();

        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
        PersistentSubscription sub = topic.getSubscription(subName);

        final int numProducedMessages = 10;

        for (int i = 0; i < numProducedMessages; i++) {
            final String message = "my-message-" + i;
            producer.send(message.getBytes());
        }

        for (int i = 0; i < numProducedMessages; i++) {
            Message<byte[]> msg = consumer.receive();
            consumer.acknowledge(msg);
        }

        Dispatcher dispatcher = sub.getDispatcher();
        Assert.assertTrue(dispatcher.getRateLimiter().isPresent());
        DispatchRateLimiter dispatchRateLimiter = dispatcher.getRateLimiter().get();

        producer.close();
        consumer.close();
        sub.disconnect().get();

        // Make sure that the rate limiter is closed
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);

        log.info("-- Exiting {} test --", methodName);
    }
}
