/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/**
 * Starts 3 brokers that are in 3 different clusters
 */
@Test(groups = "broker")
public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {

    @Override
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        config1.setSystemTopicEnabled(true);
        config1.setDefaultNumberOfNamespaceBundles(1);
        config1.setTopicLevelPoliciesEnabled(true);
        config2.setSystemTopicEnabled(true);
        config2.setTopicLevelPoliciesEnabled(true);
        config2.setDefaultNumberOfNamespaceBundles(1);
        config3.setSystemTopicEnabled(true);
        config3.setTopicLevelPoliciesEnabled(true);
        config3.setDefaultNumberOfNamespaceBundles(1);
        super.setup();
    }

    @Override
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    @Test
    public void testReplicateQuotaTopicPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set BacklogQuota
        BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl();
        backlogQuota.setLimitSize(1);
        backlogQuota.setLimitTime(2);
        admin1.topicPolicies(true).setBacklogQuota(topic, backlogQuota);

        Awaitility.await().untilAsserted(() ->
                assertTrue(admin2.topicPolicies(true).getBacklogQuotaMap(topic).containsValue(backlogQuota)));
        Awaitility.await().untilAsserted(() ->
                assertTrue(admin3.topicPolicies(true).getBacklogQuotaMap(topic).containsValue(backlogQuota)));
        //remove BacklogQuota
        admin1.topicPolicies(true).removeBacklogQuota(topic);
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getBacklogQuotaMap(topic).size(), 0));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getBacklogQuotaMap(topic).size(), 0));
    }

    @Test
    public void testReplicateMessageTTLPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set message ttl
        admin1.topicPolicies(true).setMessageTTL(topic, 10);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getMessageTTL(topic).intValue(), 10));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getMessageTTL(topic).intValue(), 10));
        //remove message ttl
        admin1.topicPolicies(true).removeMessageTTL(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getMessageTTL(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getMessageTTL(topic)));
    }

    @Test
    public void testReplicateSubscribeRatePolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set global topic policy
        SubscribeRate subscribeRate = new SubscribeRate(100, 10000);
        admin1.topicPolicies(true).setSubscribeRate(topic, subscribeRate);

        // get global topic policy
        untilRemoteClustersAsserted(
                admin -> assertEquals(admin.topicPolicies(true).getSubscribeRate(topic), subscribeRate));

        // remove global topic policy
        admin1.topicPolicies(true).removeSubscribeRate(topic);
        untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getSubscribeRate(topic)));
    }

    @Test
    public void testReplicateMaxMessageSizePolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set global topic policy
        admin1.topicPolicies(true).setMaxMessageSize(topic, 1000);

        // get global topic policy
        untilRemoteClustersAsserted(
                admin -> assertEquals(admin.topicPolicies(true).getMaxMessageSize(topic), Integer.valueOf(1000)));

        // remove global topic policy
        admin1.topicPolicies(true).removeMaxMessageSize(topic);
        untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getMaxMessageSize(topic)));
    }

    @Test
    public void testReplicatePublishRatePolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set global topic policy
        PublishRate publishRate = new PublishRate(100, 10000);
        admin1.topicPolicies(true).setPublishRate(topic, publishRate);

        // get global topic policy
        untilRemoteClustersAsserted(
                admin -> assertEquals(admin.topicPolicies(true).getPublishRate(topic), publishRate));

        // remove global topic policy
        admin1.topicPolicies(true).removePublishRate(topic);
        untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getPublishRate(topic)));
    }

    @Test
    public void testReplicateDeduplicationSnapshotIntervalPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set global topic policy
        admin1.topicPolicies(true).setDeduplicationSnapshotInterval(topic, 100);

        // get global topic policy
        untilRemoteClustersAsserted(
                admin -> assertEquals(admin.topicPolicies(true).getDeduplicationSnapshotInterval(topic),
                        Integer.valueOf(100)));

        // remove global topic policy
        admin1.topicPolicies(true).removeDeduplicationSnapshotInterval(topic);
        untilRemoteClustersAsserted(
                admin -> assertNull(admin.topicPolicies(true).getDeduplicationSnapshotInterval(topic)));
    }

    private void untilRemoteClustersAsserted(ThrowingConsumer<PulsarAdmin> condition) {
        Awaitility.await().untilAsserted(() -> condition.apply(admin2));
        Awaitility.await().untilAsserted(() -> condition.apply(admin3));
    }

    private interface ThrowingConsumer<I> {
        void apply(I input) throws Throwable;
    }


    @Test
    public void testReplicatePersistentPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set PersistencePolicies
        PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000);
        admin1.topicPolicies(true).setPersistence(topic, policies);

        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getPersistence(topic), policies));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getPersistence(topic), policies));
        //remove PersistencePolicies
        admin1.topicPolicies(true).removePersistence(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getPersistence(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getPersistence(topic)));
    }

    @Test
    public void testReplicateDeduplicationStatusPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set subscription types policies
        admin1.topicPolicies(true).setDeduplicationStatus(topic, true);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertTrue(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertTrue(admin3.topicPolicies(true).getDeduplicationStatus(topic)));
        // remove subscription types policies
        admin1.topicPolicies(true).removeDeduplicationStatus(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getDeduplicationStatus(topic)));

    }

    @Test
    public void testReplicatorMaxProducer() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);

        // set max producer policies
        admin1.topicPolicies(true).setMaxProducers(topic, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getMaxProducers(topic).intValue(), 100));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getMaxProducers(topic).intValue(), 100));

        // remove max producer policies
        admin1.topicPolicies(true).removeMaxProducers(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getMaxProducers(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getMaxProducers(topic)));
    }

    @Test
    public void testReplicatorMaxConsumerPerSubPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();

        init(namespace, topic);
        // set max consumer per sub
        admin1.topicPolicies(true).setMaxConsumersPerSubscription(topic, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100));

        Awaitility.await().untilAsserted(() -> {
            assertEquals(admin1.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100);
            assertNull(admin1.topicPolicies().getMaxConsumersPerSubscription(topic));
        });

        //remove max consumer per sub
        admin1.topicPolicies(true).removeMaxConsumersPerSubscription(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic)));
    }

    @Test
    public void testReplicateMaxUnackedMsgPerConsumer() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set max unacked msgs per consumers
        admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100));
        // remove max unacked msgs per consumers
        admin1.topicPolicies(true).removeMaxUnackedMessagesOnConsumer(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic)));
    }

    @Test
    public void testReplicatorTopicPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

        init(namespace, persistentTopicName);
        // set retention
        RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
        admin1.topicPolicies(true).setRetention(persistentTopicName, retentionPolicies);

        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies));

        Awaitility.await().untilAsserted(() -> {
            assertEquals(admin1.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies);
            assertNull(admin1.topicPolicies().getRetention(persistentTopicName));
        });

        //remove retention
        admin1.topicPolicies(true).removeRetention(persistentTopicName);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getRetention(persistentTopicName)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getRetention(persistentTopicName)));
    }
    @Test
    public void testReplicateSubscriptionTypesPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        Set<SubscriptionType> subscriptionTypes = new HashSet<>();
        subscriptionTypes.add(SubscriptionType.Shared);
        // set subscription types policies
        admin1.topicPolicies(true).setSubscriptionTypesEnabled(topic, subscriptionTypes);
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes));
        // remove subscription types policies
        admin1.topicPolicies(true).removeSubscriptionTypesEnabled(topic);
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));

    }


    @Test
    public void testReplicateMaxConsumers() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set max consumers
        admin1.topicPolicies(true).setMaxConsumers(topic, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getMaxConsumers(topic).intValue(), 100));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getMaxConsumers(topic).intValue(), 100));
        // remove max consumers
        admin1.topicPolicies(true).removeMaxConsumers(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getMaxConsumers(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getMaxConsumers(topic)));
    }

    @Test
    public void testReplicatorMessageDispatchRatePolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

        init(namespace, persistentTopicName);
        // set dispatchRate
        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(1)
                .dispatchThrottlingRateInMsg(2)
                .ratePeriodInSecond(3)
                .relativeToPublishRate(true)
                .build();
        admin1.topicPolicies(true).setDispatchRate(persistentTopicName, dispatchRate);

        // get dispatchRate
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getDispatchRate(persistentTopicName), dispatchRate));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getDispatchRate(persistentTopicName), dispatchRate));

        //remove dispatchRate
        admin1.topicPolicies(true).removeDispatchRate(persistentTopicName);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getDispatchRate(persistentTopicName)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getDispatchRate(persistentTopicName)));
    }

    @Test
    public void testReplicateDelayedDelivery() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        DelayedDeliveryPolicies policies = DelayedDeliveryPolicies.builder().active(true).tickTime(10000L).build();
        // set delayed delivery
        admin1.topicPolicies(true).setDelayedDeliveryPolicy(topic, policies);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies));
        // remove delayed delivery
        admin1.topicPolicies(true).removeDelayedDeliveryPolicy(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getDelayedDeliveryPolicy(topic)));
    }

    @Test
    public void testReplicatorInactiveTopicPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, persistentTopicName);

        // set InactiveTopicPolicies
        InactiveTopicPolicies inactiveTopicPolicies =
                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies);
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true)
                        .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true)
                        .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
        // remove InactiveTopicPolicies
        admin1.topicPolicies(true).removeInactiveTopicPolicies(persistentTopicName);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
    }

    @Test
    public void testReplicatorSubscriptionDispatchRatePolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

        init(namespace, persistentTopicName);
        // set subscription dispatch rate
        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(1)
                .ratePeriodInSecond(1)
                .dispatchThrottlingRateInByte(1)
                .relativeToPublishRate(true)
                .build();
        admin1.topicPolicies(true).setSubscriptionDispatchRate(persistentTopicName, dispatchRate);
        // get subscription dispatch rate
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true)
                        .getSubscriptionDispatchRate(persistentTopicName), dispatchRate));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true)
                        .getSubscriptionDispatchRate(persistentTopicName), dispatchRate));

        //remove subscription dispatch rate
        admin1.topicPolicies(true).removeSubscriptionDispatchRate(persistentTopicName);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName)));
    }

    @Test
    public void testReplicateReplicatorDispatchRatePolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

        init(namespace, persistentTopicName);
        // set replicator dispatch rate
        DispatchRate dispatchRate = DispatchRate.builder()
                .dispatchThrottlingRateInMsg(1)
                .ratePeriodInSecond(1)
                .dispatchThrottlingRateInByte(1)
                .relativeToPublishRate(true)
                .build();
        admin1.topicPolicies(true).setReplicatorDispatchRate(persistentTopicName, dispatchRate);
        // get replicator dispatch rate
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true)
                        .getReplicatorDispatchRate(persistentTopicName), dispatchRate));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true)
                        .getReplicatorDispatchRate(persistentTopicName), dispatchRate));

        //remove replicator dispatch rate
        admin1.topicPolicies(true).removeReplicatorDispatchRate(persistentTopicName);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getReplicatorDispatchRate(persistentTopicName)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getReplicatorDispatchRate(persistentTopicName)));
    }

    @Test
    public void testReplicateMaxUnackedMsgPerSub() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, topic);
        // set max unacked msgs per sub
        admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 100);
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100));
        Awaitility.await().ignoreExceptions().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100));
        // remove max unacked msgs per sub
        admin1.topicPolicies(true).removeMaxUnackedMessagesOnSubscription(topic);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
    }

    @Test
    public void testReplicatorCompactionThresholdPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

        init(namespace, persistentTopicName);
        // set compaction threshold
        admin1.topicPolicies(true).setCompactionThreshold(persistentTopicName, 1);
        // get compaction threshold
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true)
                        .getCompactionThreshold(persistentTopicName), Long.valueOf(1)));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true)
                        .getCompactionThreshold(persistentTopicName), Long.valueOf(1)));

        //remove compaction threshold
        admin1.topicPolicies(true).removeCompactionThreshold(persistentTopicName);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
    }

    @Test
    public void testReplicateMaxSubscriptionsPerTopic() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
        init(namespace, persistentTopicName);

        //set max subscriptions per topic
        admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(persistentTopicName, 1024);

        //get max subscriptions per topic
        untilRemoteClustersAsserted(
                admin -> assertEquals(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName),
                        Integer.valueOf(1024)));

        //remove
        admin1.topicPolicies(true).removeMaxSubscriptionsPerTopic(persistentTopicName);
        untilRemoteClustersAsserted(
                admin -> assertNull(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName)));
    }

    @Test
    public void testReplicatorOffloadPolicies() throws Exception {
        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();

        init(namespace, persistentTopicName);
        OffloadPoliciesImpl offloadPolicies =
                OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", null, null, null, null,
                8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST);

        // set offload policies
        try{
            admin1.topicPolicies(true).setOffloadPolicies(persistentTopicName, offloadPolicies);
        }catch (Exception exception){
            // driver not found exception.
            assertTrue(exception instanceof PulsarAdminException.ServerSideErrorException);
        }
        // get offload policies
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin2.topicPolicies(true).getOffloadPolicies(persistentTopicName), offloadPolicies));
        Awaitility.await().untilAsserted(() ->
                assertEquals(admin3.topicPolicies(true).getOffloadPolicies(persistentTopicName), offloadPolicies));

        //remove offload policies
        admin1.topicPolicies(true).removeOffloadPolicies(persistentTopicName);
        Awaitility.await().untilAsserted(() ->
                assertNull(admin2.topicPolicies(true).getOffloadPolicies(persistentTopicName)));
        Awaitility.await().untilAsserted(() ->
                assertNull(admin3.topicPolicies(true).getOffloadPolicies(persistentTopicName)));
    }


    private void init(String namespace, String topic)
            throws PulsarAdminException, PulsarClientException, PulsarServerException {
        final String cluster2 = pulsar2.getConfig().getClusterName();
        final String cluster1 = pulsar1.getConfig().getClusterName();
        final String cluster3 = pulsar3.getConfig().getClusterName();

        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
        // Create partitioned-topic from R1
        admin1.topics().createPartitionedTopic(topic, 3);
        // List partitioned topics from R2
        Awaitility.await().untilAsserted(() -> assertNotNull(admin2.topics().getPartitionedTopicList(namespace)));
        Awaitility.await().untilAsserted(() -> assertEquals(
                admin2.topics().getPartitionedTopicList(namespace).get(0), topic));
        assertEquals(admin1.topics().getList(namespace).size(), 3);
        // List partitioned topics from R3
        Awaitility.await().untilAsserted(() -> assertNotNull(admin3.topics().getPartitionedTopicList(namespace)));
        Awaitility.await().untilAsserted(() -> assertEquals(
                admin3.topics().getPartitionedTopicList(namespace).get(0), topic));

        pulsar1.getClient().newProducer().topic(topic).create().close();
        pulsar2.getClient().newProducer().topic(topic).create().close();
        pulsar3.getClient().newProducer().topic(topic).create().close();

        //init topic policies server
        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
            assertNull(pulsar1.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)));
            assertNull(pulsar2.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)));
            assertNull(pulsar3.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)));
        });
    }

}
