/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.admin;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Slf4j
@Test(groups = "broker")
public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {

    private final String testTenant = "my-tenant";

    private final String testNamespace = "my-namespace";

    private final String myNamespace = testTenant + "/" + testNamespace;

    private final String testTopic = "persistent://" + myNamespace + "/test-set-backlog-quota";

    private final String persistenceTopic = "persistent://" + myNamespace + "/test-set-persistence";

    @BeforeMethod
    @Override
    protected void setup() throws Exception {
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setDefaultNumberOfNamespaceBundles(1);
        super.internalSetup();

        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
        admin.tenants().createTenant(this.testTenant, tenantInfo);
        admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Sets.newHashSet("test"));
        admin.topics().createPartitionedTopic(testTopic, 2);
        Producer producer = pulsarClient.newProducer().topic(testTopic).create();
        producer.close();
        waitForZooKeeperWatchers();
    }

    @AfterMethod(alwaysRun = true)
    @Override
    public void cleanup() throws Exception {
        super.internalCleanup();
        this.resetConfig();
    }

    @Test
    public void testSetBacklogQuota() throws Exception {

        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setBacklogQuota(testTopic, backlogQuota);
        log.info("Backlog quota set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                        .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));

        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
        Assert.assertEquals(backlogQuota, backlogQuotaInManager);

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemoveBacklogQuota() throws Exception {
        BacklogQuota backlogQuota = new BacklogQuota(1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setBacklogQuota(testTopic, backlogQuota);
        log.info("Backlog quota set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                        .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));

        BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager();
        BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
        log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic);
        Assert.assertEquals(backlogQuota, backlogQuotaInManager);

        admin.topics().removeBacklogQuota(testTopic);
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic)
                        .get(BacklogQuota.BacklogQuotaType.destination_storage)));

        backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic));
        log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager,
                testTopic);
        Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager);

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testCheckBacklogQuota() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
        String namespace = TopicName.get(testTopic).getNamespace();
        admin.namespaces().setRetention(namespace, retentionPolicies);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies));

        BacklogQuota backlogQuota =
                new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
        try {
            admin.topics().setBacklogQuota(testTopic, backlogQuota);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }

        backlogQuota =
                new BacklogQuota(10 * 1024 * 1024 + 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
        try {
            admin.topics().setBacklogQuota(testTopic, backlogQuota);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }

        backlogQuota =
                new BacklogQuota(10 * 1024 * 1024 - 1, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
        admin.topics().setBacklogQuota(testTopic, backlogQuota);

        BacklogQuota finalBacklogQuota = backlogQuota;
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                        .get(BacklogQuota.BacklogQuotaType.destination_storage), finalBacklogQuota));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test(timeOut = 20000)
    public void testGetBacklogQuotaApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap());
        assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap());
        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf);
        assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap);
        BacklogQuota namespaceQuota = new BacklogQuota(30L, BacklogQuota.RetentionPolicy.producer_exception);

        admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota);
        Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> namespaceQuotaMap = Maps.newHashMap();
        namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, namespaceQuota);
        assertEquals(admin.topics().getBacklogQuotaMap(topic, true), namespaceQuotaMap);

        BacklogQuota topicQuota = new BacklogQuota(40L, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        admin.topics().setBacklogQuota(topic, topicQuota);
        Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
        Map<BacklogQuota.BacklogQuotaType, BacklogQuota> topicQuotaMap = Maps.newHashMap();
        topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, topicQuota);
        assertEquals(admin.topics().getBacklogQuotaMap(topic, true), topicQuotaMap);

        admin.namespaces().removeBacklogQuota(myNamespace);
        admin.topics().removeBacklogQuota(topic);
        Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty()));
        Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty()));
        assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap);
    }

    @Test
    public void testCheckBacklogQuotaFailed() throws Exception {
        RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10);
        String namespace = TopicName.get(testTopic).getNamespace();
        admin.namespaces().setRetention(namespace, retentionPolicies);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies));

        BacklogQuota backlogQuota =
                new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);
        try {
            admin.topics().setBacklogQuota(testTopic, backlogQuota);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }
        //Ensure that the cache has not been updated after a long time
        Awaitility.await().atLeast(1, TimeUnit.SECONDS);
        assertNull(admin.topics().getBacklogQuotaMap(testTopic)
                .get(BacklogQuota.BacklogQuotaType.destination_storage));
    }

    @Test
    public void testCheckRetention() throws Exception {
        BacklogQuota backlogQuota =
                new BacklogQuota(10 * 1024 * 1024, BacklogQuota.RetentionPolicy.consumer_backlog_eviction);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setBacklogQuota(testTopic, backlogQuota);
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                        .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));

        RetentionPolicies retention = new RetentionPolicies(10, 10);
        log.info("Retention: {} will set to the topic: {}", retention, testTopic);
        try {
            admin.topics().setRetention(testTopic, retention);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }

        retention = new RetentionPolicies(10, 9);
        log.info("Retention: {} will set to the topic: {}", retention, testTopic);
        try {
            admin.topics().setRetention(testTopic, retention);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }

        retention = new RetentionPolicies(10, 12);
        log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);
        admin.topics().setRetention(testTopic, retention);

        RetentionPolicies finalRetention = retention;
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), finalRetention));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testSetRetention() throws Exception {
        RetentionPolicies retention = new RetentionPolicies(60, 1024);
        log.info("Retention: {} will set to the topic: {}", retention, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setRetention(testTopic, retention);
        log.info("Retention set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), retention));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemoveRetention() throws Exception {

        RetentionPolicies retention = new RetentionPolicies(60, 1024);
        log.info("Retention: {} will set to the topic: {}", retention, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setRetention(testTopic, retention);
        log.info("Retention set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), retention));

        admin.topics().removeRetention(testTopic);
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getRetention(testTopic)));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test(timeOut = 10000)
    public void testRetentionAppliedApi() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        RetentionPolicies brokerPolicies =
                new RetentionPolicies(conf.getDefaultRetentionTimeInMinutes(), conf.getDefaultRetentionSizeInMB());
        assertEquals(admin.topics().getRetention(topic, true), brokerPolicies);

        RetentionPolicies namespacePolicies = new RetentionPolicies(10, 20);
        admin.namespaces().setRetention(myNamespace, namespacePolicies);
        Awaitility.await().untilAsserted(()
                -> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies));

        RetentionPolicies topicPolicies = new RetentionPolicies(20,30);
        admin.topics().setRetention(topic, topicPolicies);
        Awaitility.await().untilAsserted(()
                -> assertEquals(admin.topics().getRetention(topic, true), topicPolicies));

        admin.topics().removeRetention(topic);
        Awaitility.await().untilAsserted(()
                -> assertEquals(admin.topics().getRetention(topic, true), namespacePolicies));

        admin.namespaces().removeRetention(myNamespace);
        Awaitility.await().untilAsserted(()
                -> assertEquals(admin.topics().getRetention(topic, true), brokerPolicies));
    }

    @Test(timeOut = 20000)
    public void testGetSubDispatchRateApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getSubscriptionDispatchRate(topic));
        assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace));
        DispatchRate brokerDispatchRate = new DispatchRate(
                pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
                pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(),
                1
        );
        assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
        DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12);

        admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
        assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), namespaceDispatchRate);

        DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22);
        admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionDispatchRate(topic)));
        assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), topicDispatchRate);

        admin.namespaces().removeSubscriptionDispatchRate(myNamespace);
        admin.topics().removeSubscriptionDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscriptionDispatchRate(topic)));
        assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
    }

    @Test(timeOut = 20000)
    public void testRetentionPriority() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getRetention(topic));
        assertNull(admin.namespaces().getRetention(myNamespace));

        PersistentTopic persistentTopic =
                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        Method shouldTopicBeRetained = PersistentTopic.class.getDeclaredMethod("shouldTopicBeRetained");
        shouldTopicBeRetained.setAccessible(true);
        Field lastActive = PersistentTopic.class.getSuperclass().getDeclaredField("lastActive");
        lastActive.setAccessible(true);
        //set last active to 2 minutes ago
        lastActive.setLong(persistentTopic, System.nanoTime() - TimeUnit.MINUTES.toNanos(2));
        //the default value of the broker-level is 0, so it is not retained by default
        assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
        //set namespace-level policy
        RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
        admin.namespaces().setRetention(myNamespace, retentionPolicies);
        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
                -> assertNotNull(admin.namespaces().getRetention(myNamespace)));
        assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
        // set topic-level policy
        admin.topics().setRetention(topic, new RetentionPolicies(3, 1));
        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
                -> assertNotNull(admin.topics().getRetention(topic)));
        assertTrue((boolean) shouldTopicBeRetained.invoke(persistentTopic));
        //topic-level disabled
        admin.topics().setRetention(topic, new RetentionPolicies(0, 0));
        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
                -> assertEquals(admin.topics().getRetention(topic).getRetentionSizeInMB(), 0));
        assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
        // remove topic-level policy
        admin.topics().removeRetention(topic);
        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
                -> assertNull(admin.topics().getRetention(topic)));
        assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
        //namespace-level disabled
        admin.namespaces().setRetention(myNamespace, new RetentionPolicies(0, 0));
        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
                -> assertNotNull(admin.namespaces().getRetention(myNamespace)));
        assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
        //change namespace-level policy
        admin.namespaces().setRetention(myNamespace, new RetentionPolicies(1, 1));
        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
                -> assertNotNull(admin.namespaces().getRetention(myNamespace)));
        assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
        // remove namespace-level policy
        admin.namespaces().removeRetention(myNamespace);
        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(()
                -> assertNull(admin.namespaces().getRetention(myNamespace)));
        //the default value of the broker-level is 0, so it is not retained by default
        assertFalse((boolean) shouldTopicBeRetained.invoke(persistentTopic));
    }

    @Test(timeOut = 20000)
    public void testGetPersistenceApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getPersistence(topic));
        assertNull(admin.namespaces().getPersistence(myNamespace));
        PersistencePolicies brokerPolicy
                = new PersistencePolicies(pulsar.getConfiguration().getManagedLedgerDefaultEnsembleSize(),
                pulsar.getConfiguration().getManagedLedgerDefaultWriteQuorum(),
                pulsar.getConfiguration().getManagedLedgerDefaultAckQuorum(),
                pulsar.getConfiguration().getManagedLedgerDefaultMarkDeleteRateLimit());
        assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
        PersistencePolicies namespacePolicy
                = new PersistencePolicies(5,4,3,2);

        admin.namespaces().setPersistence(myNamespace, namespacePolicy);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getPersistence(myNamespace)));
        assertEquals(admin.topics().getPersistence(topic, true), namespacePolicy);

        PersistencePolicies topicPolicy = new PersistencePolicies(4, 3, 2, 1);
        admin.topics().setPersistence(topic, topicPolicy);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getPersistence(topic)));
        assertEquals(admin.topics().getPersistence(topic, true), topicPolicy);

        admin.namespaces().removePersistence(myNamespace);
        admin.topics().removePersistence(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getPersistence(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getPersistence(topic)));
        assertEquals(admin.topics().getPersistence(topic, true), brokerPolicy);
    }

    @Test
    public void testCheckPersistence() throws Exception {
        PersistencePolicies persistencePolicies = new PersistencePolicies(6, 2, 2, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
        try {
            admin.topics().setPersistence(testTopic, persistencePolicies);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 400);
        }

        persistencePolicies = new PersistencePolicies(2, 6, 2, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
        try {
            admin.topics().setPersistence(testTopic, persistencePolicies);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 400);
        }

        persistencePolicies = new PersistencePolicies(2, 2, 6, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
        try {
            admin.topics().setPersistence(testTopic, persistencePolicies);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 400);
        }

        persistencePolicies = new PersistencePolicies(1, 2, 2, 0.0);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, testTopic);
        try {
            admin.topics().setPersistence(testTopic, persistencePolicies);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 400);
        }

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testSetPersistence() throws Exception {
        PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3);
        admin.namespaces().setPersistence(myNamespace, persistencePoliciesForNamespace);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPersistence(myNamespace), persistencePoliciesForNamespace));

        PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setPersistence(persistenceTopic, persistencePolicies);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getPersistence(persistenceTopic), persistencePolicies));

        admin.lookups().lookupTopic(persistenceTopic);
        Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get();
        PersistentTopic persistentTopic = (PersistentTopic) t;
        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
        assertEquals(managedLedgerConfig.getEnsembleSize(), 3);
        assertEquals(managedLedgerConfig.getWriteQuorumSize(), 3);
        assertEquals(managedLedgerConfig.getAckQuorumSize(), 3);
        assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.1);

        PersistencePolicies getPersistencePolicies = admin.topics().getPersistence(persistenceTopic);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic);
        Assert.assertEquals(getPersistencePolicies, persistencePolicies);
    }

    @Test
    public void testGetDispatchRateApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getDispatchRate(topic));
        assertNull(admin.namespaces().getDispatchRate(myNamespace));
        DispatchRate brokerDispatchRate = new DispatchRate(
                pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(),
                pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(),
                1
        );
        assertEquals(admin.topics().getDispatchRate(topic, true), brokerDispatchRate);
        DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12);

        admin.namespaces().setDispatchRate(myNamespace, namespaceDispatchRate);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getDispatchRate(myNamespace)));
        assertEquals(admin.topics().getDispatchRate(topic, true), namespaceDispatchRate);

        DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22);
        admin.topics().setDispatchRate(topic, topicDispatchRate);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getDispatchRate(topic)));
        assertEquals(admin.topics().getDispatchRate(topic, true), topicDispatchRate);

        admin.namespaces().removeDispatchRate(myNamespace);
        admin.topics().removeDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDispatchRate(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getDispatchRate(topic)));
        assertEquals(admin.topics().getDispatchRate(topic, true), brokerDispatchRate);
    }

    @Test
    public void testRemovePersistence() throws Exception {
        PersistencePolicies persistencePoliciesForNamespace = new PersistencePolicies(2, 2, 2, 0.3);
        admin.namespaces().setPersistence(myNamespace, persistencePoliciesForNamespace);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getPersistence(myNamespace), persistencePoliciesForNamespace));

        PersistencePolicies persistencePolicies = new PersistencePolicies(3, 3, 3, 0.1);
        log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setPersistence(persistenceTopic, persistencePolicies);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getPersistence(persistenceTopic), persistencePolicies));

        admin.topics().removePersistence(persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getPersistence(persistenceTopic)));

        admin.lookups().lookupTopic(persistenceTopic);
        Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get();
        PersistentTopic persistentTopic = (PersistentTopic) t;
        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
        assertEquals(managedLedgerConfig.getEnsembleSize(), 2);
        assertEquals(managedLedgerConfig.getWriteQuorumSize(), 2);
        assertEquals(managedLedgerConfig.getAckQuorumSize(), 2);
        assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.3);
    }

    @Test
    public void testCheckMaxProducers() throws Exception {
        int maxProducers = -1;
        log.info("MaxProducers: {} will set to the topic: {}", maxProducers, testTopic);
        try {
            admin.topics().setMaxProducers(testTopic, maxProducers);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testGetMaxProducerApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getMaxProducers(topic));
        assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace));
        assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic());

        admin.namespaces().setMaxProducersPerTopic(myNamespace, 7);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getMaxProducersPerTopic(myNamespace)));
        assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), 7);

        admin.topics().setMaxProducers(topic, 1000);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxProducers(topic)));
        assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), 1000);

        admin.namespaces().removeMaxProducersPerTopic(myNamespace);
        admin.topics().removeMaxProducers(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getMaxProducersPerTopic(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getMaxProducers(topic)));
        assertEquals(admin.topics().getMaxProducers(topic, true).intValue(), conf.getMaxProducersPerTopic());
    }

    @Test
    public void testSetMaxProducers() throws Exception {
        Integer maxProducers = 2;
        log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setMaxProducers(persistenceTopic, maxProducers);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers));

        admin.topics().createPartitionedTopic(persistenceTopic, 2);
        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
        Producer<byte[]> producer3 = null;

        try {
            producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on topic level.");
        }
        Assert.assertNotNull(producer1);
        Assert.assertNotNull(producer2);
        Assert.assertNull(producer3);

        admin.topics().deletePartitionedTopic(persistenceTopic, true);
        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemoveMaxProducers() throws Exception {
        Integer maxProducers = 2;
        log.info("MaxProducers: {} will set to the topic: {}", maxProducers, persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setMaxProducers(persistenceTopic, maxProducers);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers));

        admin.topics().createPartitionedTopic(persistenceTopic, 2);
        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistenceTopic).create();
        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(persistenceTopic).create();
        Producer<byte[]> producer3 = null;
        Producer<byte[]> producer4 = null;

        try {
            producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on topic level.");
        }
        Assert.assertNotNull(producer1);
        Assert.assertNotNull(producer2);
        Assert.assertNull(producer3);

        admin.topics().removeMaxProducers(persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getMaxProducers(persistenceTopic)));

        producer3 = pulsarClient.newProducer().topic(persistenceTopic).create();
        Assert.assertNotNull(producer3);
        admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace).intValue(), 3));

        log.info("MaxProducers: {} will set to the namespace: {}", 3, myNamespace);
        try {
            producer4 = pulsarClient.newProducer().topic(persistenceTopic).create();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max producers limit on namespace level.");
        }
        Assert.assertNull(producer4);

        producer1.close();
        producer2.close();
        producer3.close();
        admin.topics().deletePartitionedTopic(persistenceTopic, true);
        admin.topics().deletePartitionedTopic(testTopic, true);
    }


    @Test
    public void testGetSetDispatchRate() throws Exception {
        DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true);
        log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setDispatchRate(testTopic, dispatchRate);
        log.info("Dispatch Rate set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getDispatchRate(testTopic), dispatchRate));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemoveDispatchRate() throws Exception {
        DispatchRate dispatchRate = new DispatchRate(100, 10000, 1, true);
        log.info("Dispatch Rate: {} will set to the topic: {}", dispatchRate, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setDispatchRate(testTopic, dispatchRate);
        log.info("Dispatch Rate set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getDispatchRate(testTopic), dispatchRate));

        admin.topics().removeDispatchRate(testTopic);
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getDispatchRate(testTopic)));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test(timeOut = 20000)
    public void testPolicyOverwrittenByNamespaceLevel() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        admin.topics().createNonPartitionedTopic(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        DispatchRate dispatchRate = new DispatchRate(200, 20000, 1, true);
        admin.namespaces().setDispatchRate(myNamespace, dispatchRate);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getDispatchRate(myNamespace).dispatchThrottlingRateInMsg, 200));

        dispatchRate = new DispatchRate(100, 10000, 1, true);
        admin.topics().setDispatchRate(topic, dispatchRate);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNotNull(admin.topics().getDispatchRate(topic)));

        //1 Set ns level policy, topic level should not be overwritten
        dispatchRate = new DispatchRate(300, 30000, 2, true);
        admin.namespaces().setDispatchRate(myNamespace, dispatchRate);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> {
                    DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
                    Assert.assertEquals(limiter.getDispatchRateOnByte(), 10000);
                    Assert.assertEquals(limiter.getDispatchRateOnMsg(), 100);
                });

        admin.topics().removeDispatchRate(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getDispatchRate(topic)));

        //2 Remove level policy ,DispatchRateLimiter should us ns level policy
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> {
                    DispatchRateLimiter limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
                    Assert.assertEquals(limiter.getDispatchRateOnByte(), 30000);
                    Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300);
                });
    }

    @Test(timeOut = 20000)
    public void testRestart() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        admin.topics().createNonPartitionedTopic(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        InactiveTopicPolicies inactiveTopicPolicies =
                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true);
        admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getInactiveTopicPolicies(myNamespace).getInactiveTopicDeleteMode(),
                        InactiveTopicDeleteMode.delete_when_subscriptions_caught_up));

        inactiveTopicPolicies =
                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
        admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNotNull(admin.topics().getInactiveTopicPolicies(topic)));

        // restart broker, policy should still take effect
        restartBroker();

        // Trigger the cache init.
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();

        InactiveTopicPolicies finalInactiveTopicPolicies = inactiveTopicPolicies;
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .untilAsserted(() -> {
                    PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
                    Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), finalInactiveTopicPolicies);
                });

        producer.close();
    }

    @Test
    public void testGetSetSubscriptionDispatchRate() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        admin.topics().createNonPartitionedTopic(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        DispatchRate dispatchRate = new DispatchRate(1000,
                1024 * 1024, 1);
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);

        admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
        log.info("Subscription dispatch rate set success on topic: {}", topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), dispatchRate));

        String subscriptionName = "test_subscription_rate";
        Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();

        DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotNull(dispatchRateLimiter);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);

        consumer.close();
        admin.topics().delete(topic, true);
    }

    @Test
    public void testGetSetSubscriptionDispatchRateAfterTopicLoaded() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        admin.topics().createNonPartitionedTopic(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        DispatchRate dispatchRate = new DispatchRate(1000,
                1024 * 1024, 1);
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);

        String subscriptionName = "test_subscription_rate";
        Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();

        admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
        log.info("Subscription dispatch rate set success on topic: {}", topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), dispatchRate));

        DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotNull(dispatchRateLimiter);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);

        consumer.close();
        admin.topics().delete(topic, true);
    }

    @Test
    public void testRemoveSubscriptionDispatchRate() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        admin.topics().createNonPartitionedTopic(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        DispatchRate dispatchRate = new DispatchRate(1000,
                1024 * 1024, 1);
        log.info("Subscription Dispatch Rate: {} will set to the topic: {}", dispatchRate, topic);

        admin.topics().setSubscriptionDispatchRate(topic, dispatchRate);
        log.info("Subscription dispatch rate set success on topic: {}", topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), dispatchRate));

        String subscriptionName = "test_subscription_rate";
        Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();

        DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotNull(dispatchRateLimiter);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);

        // remove subscription dispatch rate
        admin.topics().removeSubscriptionDispatchRate(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getSubscriptionDispatchRate(topic)));

        dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.dispatchThrottlingRateInMsg);
        Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.dispatchThrottlingRateInByte);

        consumer.close();
        admin.topics().delete(topic, true);
    }

    @Test
    public void testSubscriptionDispatchRatePolicyOverwrittenNamespaceLevel() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        admin.topics().createNonPartitionedTopic(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        // set namespace level subscription dispatch rate
        DispatchRate namespaceDispatchRate = new DispatchRate(100, 1024 * 1024, 1);
        admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(myNamespace), namespaceDispatchRate));

        String subscriptionName = "test_subscription_rate";
        Consumer<byte[]> consumer = pulsarClient.newConsumer().subscriptionName(subscriptionName).topic(topic).subscribe();

        // get subscription dispatch Rate limiter
        DispatchRateLimiter dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
                .get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte);

        // set topic level subscription dispatch rate
        DispatchRate topicDispatchRate = new DispatchRate(200, 2 * 1024 * 1024, 1);
        admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscriptionDispatchRate(topic), topicDispatchRate));

        // get subscription dispatch rate limiter
        dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
                .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), topicDispatchRate.dispatchThrottlingRateInByte);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), topicDispatchRate.dispatchThrottlingRateInMsg);

        // remove topic level subscription dispatch rate limiter
        admin.topics().removeSubscriptionDispatchRate(topic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getSubscriptionDispatchRate(topic)));

        // get subscription dispatch rate limiter
        dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get()
                .getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), namespaceDispatchRate.dispatchThrottlingRateInByte);
        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), namespaceDispatchRate.dispatchThrottlingRateInMsg);

        consumer.close();
        admin.topics().delete(topic, true);
    }

    @Test
    public void testGetSetCompactionThreshold() throws Exception {
        Long compactionThreshold = 100000L;
        log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
        log.info("Compaction threshold set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getCompactionThreshold(testTopic), compactionThreshold));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemoveCompactionThreshold() throws Exception {
        Long compactionThreshold = 100000L;
        log.info("Compaction threshold: {} will set to the topic: {}", compactionThreshold, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setCompactionThreshold(testTopic, compactionThreshold);
        log.info("Compaction threshold set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getCompactionThreshold(testTopic), compactionThreshold));

        admin.topics().removeCompactionThreshold(testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getCompactionThreshold(testTopic)));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testGetSetMaxConsumersPerSubscription() throws Exception {
        Integer maxConsumersPerSubscription = 10;
        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
        log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumersPerSubscription(testTopic), maxConsumersPerSubscription));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemoveMaxConsumersPerSubscription() throws Exception {
        Integer maxConsumersPerSubscription = 10;
        log.info("MaxConsumersPerSubscription: {} will set to the topic: {}", maxConsumersPerSubscription, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setMaxConsumersPerSubscription(testTopic, maxConsumersPerSubscription);
        log.info("MaxConsumersPerSubscription set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumersPerSubscription(testTopic), maxConsumersPerSubscription));

        admin.topics().removeMaxConsumersPerSubscription(testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getMaxConsumersPerSubscription(testTopic)));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testGetSetPublishRate() throws Exception {
        PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
        log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setPublishRate(testTopic, publishRate);
        log.info("Publish Rate set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getPublishRate(testTopic), publishRate));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemovePublishRate() throws Exception {
        PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
        log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setPublishRate(testTopic, publishRate);
        log.info("Publish Rate set success on topic: {}", testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getPublishRate(testTopic), publishRate));

        admin.topics().removePublishRate(testTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getPublishRate(testTopic)));

        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testCheckMaxConsumers() throws Exception {
        Integer maxProducers = new Integer(-1);
        log.info("MaxConsumers: {} will set to the topic: {}", maxProducers, testTopic);
        try {
            admin.topics().setMaxConsumers(testTopic, maxProducers);
            Assert.fail();
        } catch (PulsarAdminException e) {
            Assert.assertEquals(e.getStatusCode(), 412);
        }

        admin.topics().deletePartitionedTopic(testTopic, true);
    }
    @Test
    public void testGetMaxConsumersApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getMaxConsumers(topic));
        assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace));
        assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), conf.getMaxConsumersPerTopic());

        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 7);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)));
        assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), 7);

        admin.topics().setMaxConsumers(topic, 1000);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getMaxConsumers(topic)));
        assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), 1000);

        admin.namespaces().removeMaxConsumersPerTopic(myNamespace);
        admin.topics().removeMaxConsumers(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getMaxConsumers(topic)));
        assertEquals(admin.topics().getMaxConsumers(topic, true).intValue(), conf.getMaxConsumersPerTopic());
    }

    @Test
    public void testSetMaxConsumers() throws Exception {
        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1);
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxConsumersPerTopic(myNamespace).intValue(), 1));
        log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace);
        Integer maxConsumers = 2;
        log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers));

        admin.topics().createPartitionedTopic(persistenceTopic, 2);
        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
        Consumer<byte[]> consumer3 = null;

        try {
            consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit");
        }
        Assert.assertNotNull(consumer1);
        Assert.assertNotNull(consumer2);
        Assert.assertNull(consumer3);
        consumer1.close();
        consumer2.close();

        admin.topics().deletePartitionedTopic(persistenceTopic, true);
        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testRemoveMaxConsumers() throws Exception {
        Integer maxConsumers = 2;
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers));

        admin.topics().createPartitionedTopic(persistenceTopic, 2);
        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
        Consumer<byte[]> consumer3 = null;

        try {
            consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit");
        }
        Assert.assertNotNull(consumer1);
        Assert.assertNotNull(consumer2);
        Assert.assertNull(consumer3);

        admin.topics().removeMaxConsumers(persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertNull(admin.topics().getMaxConsumers(persistenceTopic)));

        consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
        Assert.assertNotNull(consumer3);
        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3);
        log.info("MaxConsumers: {} will set to the namespace: {}", 3, myNamespace);

        Consumer<byte[]> consumer4 = null;
        try {
            consumer4 = pulsarClient.newConsumer().subscriptionName("sub4").topic(persistenceTopic).subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("Topic reached max consumers limit on namespace level.");
        }
        Assert.assertNull(consumer4);

        consumer1.close();
        consumer2.close();
        consumer3.close();
        admin.topics().deletePartitionedTopic(persistenceTopic, true);
        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testGetSetSubscribeRate() throws Exception {
        admin.topics().createPartitionedTopic(persistenceTopic, 2);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        SubscribeRate subscribeRate1 = new SubscribeRate(1, 30);
        log.info("Subscribe Rate: {} will be set to the namespace: {}", subscribeRate1, myNamespace);
        admin.namespaces().setSubscribeRate(myNamespace, subscribeRate1);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getSubscribeRate(myNamespace), subscribeRate1));

        SubscribeRate subscribeRate2 =  new SubscribeRate(2, 30);
        log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate2, persistenceTopic);
        admin.topics().setSubscribeRate(persistenceTopic, subscribeRate2);
        log.info("Subscribe Rate set success on topic: {}", persistenceTopic);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscribeRate(persistenceTopic), subscribeRate2));

        PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
        PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
        PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);

        Consumer consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
                .topic(persistenceTopic).consumerName("test").subscribe();
        Assert.assertNotNull(consumer1);
        consumer1.close();
        pulsarClient1.shutdown();

        Consumer consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
                .topic(persistenceTopic).consumerName("test").subscribe();
        Assert.assertNotNull(consumer2);
        consumer2.close();
        pulsarClient2.shutdown();

        Consumer consumer3 = null;

        try {
            consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
                    .topic(persistenceTopic).consumerName("test").subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("subscribe rate reached max subscribe rate limit");
        }

        Assert.assertNull(consumer3);
        pulsarClient3.shutdown();

        admin.topics().deletePartitionedTopic(testTopic, true);
        admin.topics().deletePartitionedTopic(persistenceTopic, true);
    }

    @Test(timeOut = 20000)
    public void testGetSubscribeRateApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getSubscribeRate(topic));
        assertNull(admin.namespaces().getSubscribeRate(myNamespace));
        SubscribeRate brokerPolicy = new SubscribeRate(
                pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(),
                pulsar.getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
        );
        assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy);
        SubscribeRate namespacePolicy = new SubscribeRate(10, 11);

        admin.namespaces().setSubscribeRate(myNamespace, namespacePolicy);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscribeRate(myNamespace)));
        assertEquals(admin.topics().getSubscribeRate(topic, true), namespacePolicy);

        SubscribeRate topicPolicy = new SubscribeRate(20, 21);
        admin.topics().setSubscribeRate(topic, topicPolicy);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscribeRate(topic)));
        assertEquals(admin.topics().getSubscribeRate(topic, true), topicPolicy);

        admin.namespaces().removeSubscribeRate(myNamespace);
        admin.topics().removeSubscribeRate(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscribeRate(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscribeRate(topic)));
        assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy);
    }

    @Test(timeOut = 30000)
    public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        int maxConsumerInBroker = 1;
        int maxConsumerInNs = 2;
        int maxConsumerInTopic = 4;
        String mySub = "my-sub";
        conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().until(() ->
                pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        List<Consumer<String>> consumerList = new ArrayList<>();
        ConsumerBuilder<String> builder = pulsarClient.newConsumer(Schema.STRING)
                .subscriptionType(SubscriptionType.Shared)
                .topic(topic).subscriptionName(mySub);
        consumerList.add(builder.subscribe());
        try {
            builder.subscribe();
            fail("should fail");
        } catch (PulsarClientException ignored) {
        }

        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerInNs);
        Awaitility.await().untilAsserted(() ->
                assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
        consumerList.add(builder.subscribe());
        try {
            builder.subscribe();
            fail("should fail");
        } catch (PulsarClientException ignored) {
        }
        //disabled
        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0);
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), 0));
        consumerList.add(builder.subscribe());
        //set topic-level
        admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerInTopic);
        Awaitility.await().untilAsserted(() ->
                assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic)));
        consumerList.add(builder.subscribe());
        try {
            builder.subscribe();
            fail("should fail");
        } catch (PulsarClientException ignored) {
        }
        //remove topic policies
        admin.topics().removeMaxConsumersPerSubscription(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin.topics().getMaxConsumersPerSubscription(topic)));
        consumerList.add(builder.subscribe());
        //remove namespace policies, then use broker-level
        admin.namespaces().removeMaxConsumersPerSubscription(myNamespace);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
        try {
            builder.subscribe();
            fail("should fail");
        } catch (PulsarClientException ignored) {
        }

        for (Consumer<String> consumer : consumerList) {
            consumer.close();
        }
    }

    @Test
    public void testRemoveSubscribeRate() throws Exception {
        admin.topics().createPartitionedTopic(persistenceTopic, 2);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
             .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));

        SubscribeRate subscribeRate = new SubscribeRate(2, 30);
        log.info("Subscribe Rate: {} will set to the topic: {}", subscribeRate, persistenceTopic);
        admin.topics().setSubscribeRate(persistenceTopic, subscribeRate);
        log.info("Subscribe Rate set success on topic: {}", persistenceTopic);

         Awaitility.await().atMost(3, TimeUnit.SECONDS)
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getSubscribeRate(persistenceTopic), subscribeRate));

        PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
        PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
        PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);

        Consumer<byte[]> consumer1 = pulsarClient1.newConsumer().subscriptionName("sub1")
                .topic(persistenceTopic).consumerName("test").subscribe();
         Assert.assertNotNull(consumer1);
         consumer1.close();
         pulsarClient1.shutdown();

        Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().subscriptionName("sub1")
                .topic(persistenceTopic).consumerName("test").subscribe();
         Assert.assertNotNull(consumer2);
         consumer2.close();
         pulsarClient2.shutdown();

        Consumer<byte[]> consumer3 = null;

        try {
            consumer3 = pulsarClient3.newConsumer().subscriptionName("sub1")
                    .topic(persistenceTopic).consumerName("test").subscribe();
            Assert.fail();
        } catch (PulsarClientException e) {
            log.info("subscribe rate reached max subscribe rate limit");
        }
        Assert.assertNull(consumer3);

        admin.topics().removeSubscribeRate(persistenceTopic);
        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                 .untilAsserted(() -> Assert.assertNull(admin.topics().getSubscribeRate(persistenceTopic)));

        admin.topics().unload(persistenceTopic);

        PulsarClient pulsarClient4 = newPulsarClient(lookupUrl.toString(), 0);
        PulsarClient pulsarClient5 = newPulsarClient(lookupUrl.toString(), 0);
        PulsarClient pulsarClient6 = newPulsarClient(lookupUrl.toString(), 0);

         consumer3 = pulsarClient3.newConsumer().subscriptionName("sub2")
                 .topic(persistenceTopic).consumerName("test").subscribe();
         Assert.assertNotNull(consumer3);
         consumer3.close();
         pulsarClient3.shutdown();

         Consumer<byte[]> consumer4 = pulsarClient4.newConsumer().subscriptionName("sub2")
                 .topic(persistenceTopic).consumerName("test").subscribe();
         Assert.assertNotNull(consumer4);
         consumer4.close();
         pulsarClient4.shutdown();

         Consumer<byte[]> consumer5 = pulsarClient5.newConsumer().subscriptionName("sub2")
                 .topic(persistenceTopic).consumerName("test").subscribe();
         Assert.assertNotNull(consumer5);
         consumer5.close();
         pulsarClient5.shutdown();

         Consumer<byte[]> consumer6 = pulsarClient6.newConsumer().subscriptionName("sub2")
                 .topic(persistenceTopic).consumerName("test").subscribe();
         Assert.assertNotNull(consumer6);
         consumer6.close();
         pulsarClient6.shutdown();

        admin.topics().deletePartitionedTopic(persistenceTopic, true);
        admin.topics().deletePartitionedTopic(testTopic, true);
    }

    @Test
    public void testPublishRateInDifferentLevelPolicy() throws Exception {
        cleanup();
        conf.setMaxPublishRatePerTopicInMessages(5);
        conf.setMaxPublishRatePerTopicInBytes(50L);
        setup();

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic)));

        final String topicName = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        pulsarClient.newProducer().topic(topicName).create().close();
        Field publishMaxMessageRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxMessageRate");
        publishMaxMessageRate.setAccessible(true);
        Field publishMaxByteRate = PublishRateLimiterImpl.class.getDeclaredField("publishMaxByteRate");
        publishMaxByteRate.setAccessible(true);

        //1 use broker-level policy by default
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
        PublishRateLimiterImpl publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
        Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5);
        Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L);

        //2 set namespace-level policy
        PublishRate publishMsgRate = new PublishRate(10, 100L);
        admin.namespaces().setPublishRate(myNamespace, publishMsgRate);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> {
                    PublishRateLimiterImpl limiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
                    return (int)publishMaxMessageRate.get(limiter) == 10;
                });

        publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
        Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 10);
        Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 100L);

        //3 set topic-level policy, namespace-level policy should be overwritten
        PublishRate publishMsgRate2 = new PublishRate(11, 101L);
        admin.topics().setPublishRate(topicName, publishMsgRate2);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> admin.topics().getPublishRate(topicName) != null);

        publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
        Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 11);
        Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 101L);

        //4 remove topic-level policy, namespace-level policy will take effect
        admin.topics().removePublishRate(topicName);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> admin.topics().getPublishRate(topicName) == null);

        publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
        Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 10);
        Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 100L);

        //5 remove namespace-level policy, broker-level policy will take effect
        admin.namespaces().removePublishRate(myNamespace);

        Awaitility.await().atMost(3, TimeUnit.SECONDS)
                .until(() -> {
                    PublishRateLimiterImpl limiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
                    return (int)publishMaxMessageRate.get(limiter) == 5;
                });

        publishRateLimiter = (PublishRateLimiterImpl) topic.getTopicPublishRateLimiter();
        Assert.assertEquals(publishMaxMessageRate.get(publishRateLimiter), 5);
        Assert.assertEquals(publishMaxByteRate.get(publishRateLimiter), 50L);
    }

    @Test(timeOut = 20000)
    public void testTopicMaxMessageSizeApi() throws Exception{
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic)));
        assertNull(admin.topics().getMaxMessageSize(persistenceTopic));

        admin.topics().setMaxMessageSize(persistenceTopic,10);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null);
        assertEquals(admin.topics().getMaxMessageSize(persistenceTopic).intValue(),10);

        admin.topics().removeMaxMessageSize(persistenceTopic);
        assertNull(admin.topics().getMaxMessageSize(persistenceTopic));

        try {
            admin.topics().setMaxMessageSize(persistenceTopic,Integer.MAX_VALUE);
            fail("should fail");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(),412);
        }
        try {
            admin.topics().setMaxMessageSize(persistenceTopic, -1);
            fail("should fail");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(),412);
        }
    }

    @Test(timeOut = 20000)
    public void testTopicMaxMessageSize() throws Exception{
        doTestTopicMaxMessageSize(true);
        doTestTopicMaxMessageSize(false);
    }

    private void doTestTopicMaxMessageSize(boolean isPartitioned) throws Exception {
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        if (isPartitioned) {
            admin.topics().createPartitionedTopic(topic, 3);
        }
        // init cache
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getMaxMessageSize(topic));
        // set msg size
        admin.topics().setMaxMessageSize(topic, 10);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
        assertEquals(admin.topics().getMaxMessageSize(topic).intValue(), 10);

        try {
            producer.send(new byte[1024]);
        } catch (PulsarClientException e) {
            assertTrue(e instanceof PulsarClientException.NotAllowedException);
        }

        admin.topics().removeMaxMessageSize(topic);
        assertNull(admin.topics().getMaxMessageSize(topic));

        try {
            admin.topics().setMaxMessageSize(topic, Integer.MAX_VALUE);
            fail("should fail");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), 412);
        }
        try {
            admin.topics().setMaxMessageSize(topic, -1);
            fail("should fail");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), 412);
        }

        MessageId messageId = producer.send(new byte[1024]);
        assertNotNull(messageId);
        producer.close();
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsFailFast() throws Exception {
        doTestMaxSubscriptionsFailFast(SubscriptionMode.Durable);
        doTestMaxSubscriptionsFailFast(SubscriptionMode.NonDurable);
    }

    private void doTestMaxSubscriptionsFailFast(SubscriptionMode subMode) throws Exception {
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        // init cache
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        int maxSubInNamespace = 2;
        List<Consumer> consumers = new ArrayList<>();
        ConsumerBuilder consumerBuilder = pulsarClient.newConsumer().subscriptionMode(subMode)
                .subscriptionType(SubscriptionType.Shared).topic(topic);
        admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSubInNamespace);
        Awaitility.await().untilAsserted(()
                -> assertNotNull(admin.namespaces().getMaxSubscriptionsPerTopic(myNamespace)));
        for (int i = 0; i < maxSubInNamespace; i++) {
            consumers.add(consumerBuilder.subscriptionName("sub" + i).subscribe());
        }
        long start = System.currentTimeMillis();
        try {
            consumerBuilder.subscriptionName("sub").subscribe();
            fail("should fail");
        } catch (PulsarClientException e) {
            assertTrue(e instanceof PulsarClientException.NotAllowedException);
        }
        //fail fast
        assertTrue(System.currentTimeMillis() - start < 3000);
        //clean
        for (Consumer consumer : consumers) {
            consumer.close();
        }
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsPerTopicApi() throws Exception {
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        // init cache
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic));
        // set max subscriptions
        admin.topics().setMaxSubscriptionsPerTopic(topic, 10);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
        assertEquals(admin.topics().getMaxSubscriptionsPerTopic(topic).intValue(), 10);
        // remove max subscriptions
        admin.topics().removeMaxSubscriptionsPerTopic(topic);
        assertNull(admin.topics().getMaxSubscriptionsPerTopic(topic));
        // set invalidate value
        try {
            admin.topics().setMaxMessageSize(topic, -1);
            fail("should fail");
        } catch (PulsarAdminException e) {
            assertEquals(e.getStatusCode(), 412);
        }
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsPerTopicWithExistingSubs() throws Exception {
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        // init cache
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        // Set topic-level max subscriptions
        final int topicLevelMaxSubNum = 2;
        admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
        List<Consumer<String>> consumerList = new ArrayList<>();
        String subName = "my-sub-";
        for (int i = 0; i < topicLevelMaxSubNum; i++) {
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    .subscriptionType(SubscriptionType.Shared)
                    .subscriptionName(subName + i)
                    .topic(topic).subscribe();
            consumerList.add(consumer);
        }
        // should fail
        try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS)
                .serviceUrl(brokerUrl.toString()).build()) {
            consumerList.add(client.newConsumer(Schema.STRING)
                    .subscriptionName(UUID.randomUUID().toString())
                    .topic(topic).subscribe());
            fail("should fail");
        } catch (PulsarClientException ignore) {
            assertEquals(consumerList.size(), topicLevelMaxSubNum);
        }
        //create a consumer with the same subscription name, it should succeed
        pulsarClient.newConsumer(Schema.STRING)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName(subName + "0")
                .topic(topic).subscribe().close();

        //Clean up
        for (Consumer<String> c : consumerList) {
            c.close();
        }
    }

    @Test
    public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception {
        cleanup();
        conf.setMaxUnackedMessagesPerSubscription(30);
        setup();
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        // init cache
        @Cleanup
        Producer producer = pulsarClient.newProducer().topic(topic).create();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        //default value is null
        assertNull(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace));
        int msgNum = 100;
        int maxUnackedMsgOnTopic = 10;
        int maxUnackedMsgNumOnNamespace = 5;
        int defaultMaxUnackedMsgOnBroker = pulsar.getConfiguration().getMaxUnackedMessagesPerSubscription();
        produceMsg(producer, msgNum);
        //set namespace-level policy, the restriction should take effect
        admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, maxUnackedMsgNumOnNamespace);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
                -> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue(), maxUnackedMsgNumOnNamespace));
        List<Message<?>> messages;
        String subName = "sub-" + UUID.randomUUID();
        ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topic)
                .subscriptionName(subName).receiverQueueSize(1)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscriptionType(SubscriptionType.Shared);
        @Cleanup
        Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
        messages = getMsgReceived(consumer1, msgNum);
        assertEquals(messages.size(), maxUnackedMsgNumOnNamespace);
        ackMessages(consumer1, messages);
        //disable namespace-level policy, should unlimited
        admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, 0);
        Awaitility.await().untilAsserted(()
                -> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue(), 0));
        messages = getMsgReceived(consumer1, 40);
        assertEquals(messages.size(), 40);
        ackMessages(consumer1, messages);

        //set topic-level and namespace-level policy, topic-level should has higher priority
        admin.namespaces().setMaxUnackedMessagesPerSubscription(myNamespace, maxUnackedMsgNumOnNamespace);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
                -> assertEquals(admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace).intValue()
                , maxUnackedMsgNumOnNamespace));
        admin.topics().setMaxUnackedMessagesOnSubscription(topic, maxUnackedMsgOnTopic);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
                -> assertNotNull(admin.topics().getMaxUnackedMessagesOnSubscription(topic)));
        //check the value applied
        PersistentTopic persistentTopic = (PersistentTopic)pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        assertEquals(persistentTopic.getMaxUnackedMessagesOnSubscription(), maxUnackedMsgOnTopic);
        messages = getMsgReceived(consumer1, Integer.MAX_VALUE);
        assertEquals(messages.size(), maxUnackedMsgOnTopic);
        ackMessages(consumer1, messages);

        //remove both namespace-level and topic-level policy, broker-level should take effect
        admin.namespaces().removeMaxUnackedMessagesPerSubscription(myNamespace);
        admin.topics().removeMaxUnackedMessagesOnSubscription(topic);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
                -> admin.namespaces().getMaxUnackedMessagesPerSubscription(myNamespace) == null
                        && admin.topics().getMaxUnackedMessagesOnSubscription(topic) == null);
        messages = getMsgReceived(consumer1, Integer.MAX_VALUE);
        assertEquals(messages.size(), defaultMaxUnackedMsgOnBroker);
    }

    private void produceMsg(Producer producer, int msgNum) throws Exception{
        for (int i = 0; i < msgNum; i++) {
            producer.send("msg".getBytes());
        }
    }

    private List<Message<?>> getMsgReceived(Consumer<byte[]> consumer1, int msgNum) throws PulsarClientException {
        List<Message<?>> messages = new ArrayList<>();
        for (int i = 0; i < msgNum; i++) {
            Message<?> message = consumer1.receive(1000, TimeUnit.MILLISECONDS);
            if (message == null) {
                break;
            }
            messages.add(message);
        }
        return messages;
    }

    private void ackMessages(Consumer<?> consumer, List<Message<?>> messages) throws Exception{
        for (Message<?> message : messages) {
            consumer.acknowledge(message);
        }
    }

    @Test(timeOut = 20000)
    public void testMaxSubscriptionsPerTopic() throws Exception {
        int brokerLevelMaxSub = 4;
        conf.setMaxSubscriptionsPerTopic(4);
        restartBroker();
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        // init cache
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        // Set topic-level max subscriptions
        final int topicLevelMaxSubNum = 2;
        admin.topics().setMaxSubscriptionsPerTopic(topic, topicLevelMaxSubNum);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);

        List<Consumer<String>> consumerList = new ArrayList<>();
        for (int i = 0; i < topicLevelMaxSubNum; i++) {
            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    .subscriptionName(UUID.randomUUID().toString())
                    .topic(topic).subscribe();
            consumerList.add(consumer);
        }
        try (PulsarClient client = PulsarClient.builder().operationTimeout(2, TimeUnit.SECONDS)
                .serviceUrl(brokerUrl.toString()).build()) {
            consumerList.add(client.newConsumer(Schema.STRING)
                    .subscriptionName(UUID.randomUUID().toString())
                    .topic(topic).subscribe());
            fail("should fail");
        } catch (PulsarClientException ignore) {
            assertEquals(consumerList.size(), topicLevelMaxSubNum);
        }
        // Set namespace-level policy, but will not take effect
        final int namespaceLevelMaxSub = 3;
        admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, namespaceLevelMaxSub);
        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
        field.setAccessible(true);
        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> (int) field.get(persistentTopic) == namespaceLevelMaxSub);

        try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
                .serviceUrl(brokerUrl.toString()).build()) {
            client.newConsumer(Schema.STRING)
                    .subscriptionName(UUID.randomUUID().toString())
                    .topic(topic).subscribe();
            fail("should fail");
        } catch (PulsarClientException ignore) {
            assertEquals(consumerList.size(), topicLevelMaxSubNum);
        }
        //Removed topic-level policy, namespace-level should take effect
        admin.topics().removeMaxSubscriptionsPerTopic(topic);
        consumerList.add(pulsarClient.newConsumer(Schema.STRING)
                .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe());
        assertEquals(consumerList.size(), namespaceLevelMaxSub);
        try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
                .serviceUrl(brokerUrl.toString()).build()) {
            consumerList.add(client.newConsumer(Schema.STRING)
                    .subscriptionName(UUID.randomUUID().toString())
                    .topic(topic).subscribe());
            fail("should fail");
        } catch (PulsarClientException ignore) {
            assertEquals(consumerList.size(), namespaceLevelMaxSub);
        }
        //Removed namespace-level policy, broker-level should take effect
        admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> field.get(persistentTopic) == null);
        consumerList.add(pulsarClient.newConsumer(Schema.STRING)
                .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe());
        assertEquals(consumerList.size(), brokerLevelMaxSub);
        try (PulsarClient client = PulsarClient.builder().operationTimeout(1000, TimeUnit.MILLISECONDS)
                .serviceUrl(brokerUrl.toString()).build()) {
            consumerList.add(client.newConsumer(Schema.STRING)
                    .subscriptionName(UUID.randomUUID().toString())
                    .topic(topic).subscribe());
            fail("should fail");
        } catch (PulsarClientException ignore) {
            assertEquals(consumerList.size(), brokerLevelMaxSub);
        }
        //Clean up
        for (Consumer<String> c : consumerList) {
            c.close();
        }
    }

    @Test(timeOut = 30000)
    public void testReplicatorRateApi() throws Exception {
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        // init cache
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

        assertNull(admin.topics().getReplicatorDispatchRate(topic));

        DispatchRate dispatchRate = new DispatchRate(100,200L,10);
        admin.topics().setReplicatorDispatchRate(topic, dispatchRate);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
                -> assertEquals(admin.topics().getReplicatorDispatchRate(topic), dispatchRate));

        admin.topics().removeReplicatorDispatchRate(topic);
        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
                -> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
    }

    @Test(timeOut = 20000)
    public void testGetReplicatorRateApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getReplicatorDispatchRate(topic));
        assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace));
        DispatchRate brokerDispatchRate = new DispatchRate(
                pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInMsg(),
                pulsar.getConfiguration().getDispatchThrottlingRatePerReplicatorInByte(),
                1
        );
        assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), brokerDispatchRate);
        DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12);

        admin.namespaces().setReplicatorDispatchRate(myNamespace, namespaceDispatchRate);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)));
        assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), namespaceDispatchRate);

        DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22);
        admin.topics().setReplicatorDispatchRate(topic, topicDispatchRate);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getReplicatorDispatchRate(topic)));
        assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), topicDispatchRate);

        admin.namespaces().removeReplicatorDispatchRate(myNamespace);
        admin.topics().removeReplicatorDispatchRate(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getReplicatorDispatchRate(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
        assertEquals(admin.topics().getReplicatorDispatchRate(topic, true), brokerDispatchRate);
    }

    @Test(timeOut = 30000)
    public void testAutoCreationDisabled() throws Exception {
        cleanup();
        conf.setAllowAutoTopicCreation(false);
        setup();
        final String topic = testTopic + UUID.randomUUID();
        admin.topics().createPartitionedTopic(topic, 3);
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        //should not fail
        assertNull(admin.topics().getMessageTTL(topic));
    }

    @Test(timeOut = 30000)
    public void testSubscriptionTypesEnabled() throws Exception {
        final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
        admin.topics().createNonPartitionedTopic(topic);

        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        // use broker.conf
        pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe().close();
        assertNull(admin.topics().getSubscriptionTypesEnabled(topic));
        // set enable failover sub type
        Set<SubscriptionType> subscriptionTypeSet = new HashSet<>();
        subscriptionTypeSet.add(SubscriptionType.Failover);
        admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);

        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
                -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)) != null);
        subscriptionTypeSet = admin.topics().getSubscriptionTypesEnabled(topic);
        assertTrue(subscriptionTypeSet.contains(SubscriptionType.Failover));
        assertFalse(subscriptionTypeSet.contains(SubscriptionType.Shared));
        assertEquals(subscriptionTypeSet.size(), 1);
        try {
            pulsarClient.newConsumer().topic(topic)
                    .subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
            fail();
        } catch (PulsarClientException pulsarClientException) {
            assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException);
        }

        // add shared type
        subscriptionTypeSet.add(SubscriptionType.Shared);
        admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);
        pulsarClient.newConsumer().topic(topic)
                .subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe().close();

        // test namespace and topic policy
        subscriptionTypeSet.add(SubscriptionType.Shared);
        admin.namespaces().setSubscriptionTypesEnabled(myNamespace, subscriptionTypeSet);

        subscriptionTypeSet.clear();
        subscriptionTypeSet.add(SubscriptionType.Failover);
        admin.topics().setSubscriptionTypesEnabled(topic, subscriptionTypeSet);

        try {
            pulsarClient.newConsumer().topic(topic)
                    .subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
            fail();
        } catch (PulsarClientException pulsarClientException) {
            assertTrue(pulsarClientException instanceof PulsarClientException.NotAllowedException);
        }
    }

    @Test(timeOut = 20000)
    public void testGetCompactionThresholdApplied() throws Exception {
        final String topic = testTopic + UUID.randomUUID();
        pulsarClient.newProducer().topic(topic).create().close();
        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
        assertNull(admin.topics().getCompactionThreshold(topic));
        assertNull(admin.namespaces().getCompactionThreshold(myNamespace));
        long brokerPolicy = pulsar.getConfiguration().getBrokerServiceCompactionThresholdInBytes();
        assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
        long namespacePolicy = 10L;

        admin.namespaces().setCompactionThreshold(myNamespace, namespacePolicy);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getCompactionThreshold(myNamespace)));
        assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), namespacePolicy);

        long topicPolicy = 20L;
        admin.topics().setCompactionThreshold(topic, topicPolicy);
        Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getCompactionThreshold(topic)));
        assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), topicPolicy);

        admin.namespaces().removeCompactionThreshold(myNamespace);
        admin.topics().removeCompactionThreshold(topic);
        Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getCompactionThreshold(myNamespace)));
        Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getCompactionThreshold(topic)));
        assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
    }

}
