/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class NonPersistentTopicTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class);
    private final String configClusterName = "r1";

    @DataProvider(name = "subscriptionType")
    public Object[][] getSubscriptionType() {
        return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Exclusive } };
    }

    @DataProvider(name = "loadManager")
    public Object[][] getLoadManager() {
        return new Object[][] { { SimpleLoadManagerImpl.class.getCanonicalName() },
                { ModularLoadManagerImpl.class.getCanonicalName() } };
    }

    @BeforeMethod
    @Override
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @AfterMethod(alwaysRun = true)
    @Override
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = 90000 /* 1.5mn */)
    public void testNonPersistentPartitionsAreNotAutoCreatedWhenThePartitionedTopicDoesNotExist() throws Exception {
        final boolean defaultAllowAutoTopicCreation = conf.isAllowAutoTopicCreation();
        try {
            // Given the auto topic creation is disabled
            cleanup();
            conf.setAllowAutoTopicCreation(false);
            setup();

            final String topicPartitionName = "non-persistent://public/default/issue-9173-partition-0";

            // Then error when subscribe to a partition of a non-persistent topic that does not exist
            assertThrows(PulsarClientException.NotFoundException.class,
                    () -> pulsarClient.newConsumer().topic(topicPartitionName).subscriptionName("sub-issue-9173").subscribe());

            // Then error when produce to a partition of a non-persistent topic that does not exist
            try {
                pulsarClient.newProducer().topic(topicPartitionName).create();
                Assert.fail("Should failed due to topic not exist");
            } catch (Exception e) {
                assertTrue(e instanceof PulsarClientException.NotFoundException);
            }
        } finally {
            conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation);
        }
    }

    @Test(timeOut = 90000 /* 1.5mn */)
    public void testAutoCreateNonPersistentPartitionsWhenThePartitionedTopicExists() throws Exception {
        final boolean defaultAllowAutoTopicCreation = conf.isAllowAutoTopicCreation();
        try {
            // Given the auto topic creation is disabled
            cleanup();
            conf.setAllowAutoTopicCreation(false);
            setup();

            // Given the non-persistent partitioned topic exists
            final String topic = "non-persistent://public/default/issue-9173";
            admin.topics().createPartitionedTopic(topic, 3);

            // When subscribe, then a sub-consumer is created for each partition which means the partitions are created
            final MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
                    .topic(topic).subscriptionName("sub-issue-9173").subscribe();
            assertEquals(consumer.getConsumers().size(), 3);

            // When produce, a sub-producer is created for each partition which means the partitions are created
            PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic).create();
            assertEquals(producer.getProducers().size(), 3);

            consumer.close();
            producer.close();
        } finally {
            conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation);
        }
    }

    @Test(dataProvider = "subscriptionType")
    public void testNonPersistentTopic(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String topic = "non-persistent://my-property/my-ns/unacked-topic";
        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                .subscriptionName("subscriber-1").subscriptionType(type).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();

        int totalProduceMsg = 500;
        for (int i = 0; i < totalProduceMsg; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
        }
        producer.flush();

        Message<?> msg = null;
        Set<String> messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer.receive(1, TimeUnit.SECONDS);
            if (msg != null) {
                consumer.acknowledge(msg);
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", receivedMessage);
                String expectedMessage = "my-message-" + i;
                testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            } else {
                break;
            }
        }
        assertEquals(messageSet.size(), totalProduceMsg);

        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", methodName);

    }

    @Test(dataProvider = "subscriptionType")
    public void testPartitionedNonPersistentTopic(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String topic = "non-persistent://my-property/my-ns/partitioned-topic";
        admin.topics().createPartitionedTopic(topic, 5);
        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1")
                .subscriptionType(type).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .create();

        int totalProduceMsg = 500;
        for (int i = 0; i < totalProduceMsg; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
        }
        producer.flush();

        Message<?> msg = null;
        Set<String> messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer.receive(1, TimeUnit.SECONDS);
            if (msg != null) {
                consumer.acknowledge(msg);
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", receivedMessage);
                String expectedMessage = "my-message-" + i;
                testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            } else {
                break;
            }
        }
        assertEquals(messageSet.size(), totalProduceMsg);

        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", methodName);

    }

    @Test(dataProvider = "subscriptionType")
    public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final int numPartitions = 5;
        final String topic = "non-persistent://my-property/my-ns/partitioned-topic";
        admin.topics().createPartitionedTopic(topic, numPartitions);

        @Cleanup
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsar.getBrokerServiceUrl())
                .statsInterval(0, TimeUnit.SECONDS)
                .build();
        Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1")
                .subscriptionType(type).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .create();

        // Ensure all partitions exist
        for (int i = 0; i < numPartitions; i++) {
            TopicName partition = TopicName.get(topic).getPartition(i);
            assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString()));
        }

        int totalProduceMsg = 500;
        for (int i = 0; i < totalProduceMsg; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
        }
        producer.flush();

        Message<?> msg = null;
        Set<String> messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer.receive(1, TimeUnit.SECONDS);
            if (msg != null) {
                consumer.acknowledge(msg);
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", receivedMessage);
                String expectedMessage = "my-message-" + i;
                testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            } else {
                break;
            }
        }
        assertEquals(messageSet.size(), totalProduceMsg);

        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", methodName);
    }

    /**
     * It verifies that broker doesn't dispatch messages if consumer runs out of permits filled out with messages
     */
    @Test(dataProvider = "subscriptionType")
    public void testConsumerInternalQueueMaxOut(SubscriptionType type) throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String topic = "non-persistent://my-property/my-ns/unacked-topic";
        final int queueSize = 10;
        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                .receiverQueueSize(queueSize).subscriptionName("subscriber-1").subscriptionType(type).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();

        int totalProduceMsg = 50;
        for (int i = 0; i < totalProduceMsg; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
        }
        producer.flush();

        Message<?> msg = null;
        Set<String> messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer.receive(1, TimeUnit.SECONDS);
            if (msg != null) {
                consumer.acknowledge(msg);
                String receivedMessage = new String(msg.getData());
                log.debug("Received message: [{}]", receivedMessage);
                String expectedMessage = "my-message-" + i;
                testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
            } else {
                break;
            }
        }
        assertEquals(messageSet.size(), queueSize);

        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", methodName);

    }

    /**
     * Verifies that broker should failed to publish message if producer publishes messages more than rate limit
     */
    @Test
    public void testProducerRateLimit() throws Exception {
        int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection();
        try {
            final String topic = "non-persistent://my-property/my-ns/unacked-topic";
            // restart broker with lower publish rate limit
            conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
            stopBroker();
            startBroker();
            // produce message concurrently
            @Cleanup("shutdownNow")
            ExecutorService executor = Executors.newFixedThreadPool(5);
            AtomicBoolean failed = new AtomicBoolean(false);
            Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1")
                    .subscribe();
            Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
            byte[] msgData = "testData".getBytes();
            final int totalProduceMessages = 10;
            CountDownLatch latch = new CountDownLatch(totalProduceMessages);
            for (int i = 0; i < totalProduceMessages; i++) {
                executor.submit(() -> {
                    try {
                        producer.send(msgData);
                    } catch (Exception e) {
                        log.error("Failed to send message", e);
                        failed.set(true);
                    }
                    latch.countDown();
                });
            }
            latch.await();

            Message<?> msg = null;
            Set<String> messageSet = Sets.newHashSet();
            for (int i = 0; i < totalProduceMessages; i++) {
                msg = consumer.receive(500, TimeUnit.MILLISECONDS);
                if (msg != null) {
                    messageSet.add(new String(msg.getData()));
                } else {
                    break;
                }
            }

            // publish should not be failed
            assertFalse(failed.get());
            // but as message should be dropped at broker: broker should not receive the message
            assertNotEquals(messageSet.size(), totalProduceMessages);

            producer.close();
        } finally {
            conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate);
        }
    }

    /**
     * verifies message delivery with multiple consumers on shared and failover subscriptions
     *
     * @throws Exception
     */
    @Test
    public void testMultipleSubscription() throws Exception {
        log.info("-- Starting {} test --", methodName);

        final String topic = "non-persistent://my-property/my-ns/unacked-topic";
        ConsumerImpl<byte[]> consumer1Shared = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                .subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe();

        ConsumerImpl<byte[]> consumer2Shared = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                .subscriptionName("subscriber-shared").subscriptionType(SubscriptionType.Shared).subscribe();

        ConsumerImpl<byte[]> consumer1FailOver = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                .subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe();

        ConsumerImpl<byte[]> consumer2FailOver = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic)
                .subscriptionName("subscriber-fo").subscriptionType(SubscriptionType.Failover).subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();

        int totalProduceMsg = 500;
        for (int i = 0; i < totalProduceMsg; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
        }
        producer.flush();

        // consume from shared-subscriptions
        Message<?> msg = null;
        Set<String> messageSet = Sets.newHashSet();
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer1Shared.receive(500, TimeUnit.MILLISECONDS);
            if (msg != null) {
                messageSet.add(new String(msg.getData()));
            } else {
                break;
            }
        }
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer2Shared.receive(500, TimeUnit.MILLISECONDS);
            if (msg != null) {
                messageSet.add(new String(msg.getData()));
            } else {
                break;
            }
        }
        assertEquals(messageSet.size(), totalProduceMsg);

        // consume from failover-subscriptions
        messageSet.clear();
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer1FailOver.receive(500, TimeUnit.MILLISECONDS);
            if (msg != null) {
                messageSet.add(new String(msg.getData()));
            } else {
                break;
            }
        }
        for (int i = 0; i < totalProduceMsg; i++) {
            msg = consumer2FailOver.receive(500, TimeUnit.MILLISECONDS);
            if (msg != null) {
                messageSet.add(new String(msg.getData()));
            } else {
                break;
            }
        }
        assertEquals(messageSet.size(), totalProduceMsg);

        producer.close();
        consumer1Shared.close();
        consumer2Shared.close();
        consumer1FailOver.close();
        consumer2FailOver.close();
        log.info("-- Exiting {} test --", methodName);

    }

    /**
     * verifies that broker is capturing topic stats correctly
     */
    @Test
    public void testTopicStats() throws Exception {

        final String topicName = "non-persistent://my-property/my-ns/unacked-topic";
        final String subName = "non-persistent";
        final int timeWaitToSync = 100;

        NonPersistentTopicStats stats;
        SubscriptionStats subStats;

        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                .subscriptionType(SubscriptionType.Shared).subscriptionName(subName).subscribe();
        Thread.sleep(timeWaitToSync);

        NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
        assertNotNull(topicRef);

        rolloverPerIntervalStats(pulsar);
        stats = topicRef.getStats(false, false, false);
        subStats = stats.getSubscriptions().values().iterator().next();

        // subscription stats
        assertEquals(stats.getSubscriptions().keySet().size(), 1);
        assertEquals(subStats.getConsumers().size(), 1);

        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
        Thread.sleep(timeWaitToSync);

        int totalProducedMessages = 100;
        for (int i = 0; i < totalProducedMessages; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
        }
        Thread.sleep(timeWaitToSync);

        rolloverPerIntervalStats(pulsar);
        stats = topicRef.getStats(false, false, false);
        subStats = stats.getSubscriptions().values().iterator().next();

        assertTrue(subStats.getMsgRateOut() > 0);
        assertEquals(subStats.getConsumers().size(), 1);
        assertTrue(subStats.getMsgThroughputOut() > 0);

        // consumer stats
        assertTrue(subStats.getConsumers().get(0).getMsgRateOut() > 0.0);
        assertTrue(subStats.getConsumers().get(0).getMsgThroughputOut() > 0.0);
        assertEquals(subStats.getMsgRateRedeliver(), 0.0);
        producer.close();
        consumer.close();

    }

    /**
     * verifies that non-persistent topic replicates using replicator
     */
    @Test
    public void testReplicator() throws Exception {

        ReplicationClusterManager replication = new ReplicationClusterManager();
        replication.setupReplicationCluster();
        try {
            final String globalTopicName = "non-persistent://pulsar/global/ns/nonPersistentTopic";
            final int timeWaitToSync = 100;

            NonPersistentTopicStats stats;
            SubscriptionStats subStats;

            @Cleanup
            PulsarClient client1 = PulsarClient.builder().serviceUrl(replication.url1.toString()).build();
            @Cleanup
            PulsarClient client2 = PulsarClient.builder().serviceUrl(replication.url2.toString()).build();
            @Cleanup
            PulsarClient client3 = PulsarClient.builder().serviceUrl(replication.url3.toString()).build();

            ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) client1.newConsumer().topic(globalTopicName)
                    .subscriptionName("subscriber-1").subscribe();
            ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) client1.newConsumer().topic(globalTopicName)
                    .subscriptionName("subscriber-2").subscribe();

            ConsumerImpl<byte[]> repl2Consumer = (ConsumerImpl<byte[]>) client2.newConsumer().topic(globalTopicName)
                    .subscriptionName("subscriber-1").subscribe();
            ConsumerImpl<byte[]> repl3Consumer = (ConsumerImpl<byte[]>) client3.newConsumer().topic(globalTopicName)
                    .subscriptionName("subscriber-1").subscribe();

            Producer<byte[]> producer = client1.newProducer().topic(globalTopicName)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();

            Thread.sleep(timeWaitToSync);

            PulsarService replicationPulasr = replication.pulsar1;

            // Replicator for r1 -> r2,r3
            NonPersistentTopic topicRef = (NonPersistentTopic) replication.pulsar1.getBrokerService()
                    .getTopicReference(globalTopicName).get();
            NonPersistentReplicator replicatorR2 = (NonPersistentReplicator) topicRef.getPersistentReplicator("r2");
            NonPersistentReplicator replicatorR3 = (NonPersistentReplicator) topicRef.getPersistentReplicator("r3");
            assertNotNull(topicRef);
            assertNotNull(replicatorR2);
            assertNotNull(replicatorR3);

            rolloverPerIntervalStats(replicationPulasr);
            stats = topicRef.getStats(false, false, false);
            subStats = stats.getSubscriptions().values().iterator().next();

            // subscription stats
            assertEquals(stats.getSubscriptions().keySet().size(), 2);
            assertEquals(subStats.getConsumers().size(), 1);

            Thread.sleep(timeWaitToSync);

            int totalProducedMessages = 100;
            for (int i = 0; i < totalProducedMessages; i++) {
                String message = "my-message-" + i;
                producer.send(message.getBytes());
            }

            // (1) consume by consumer1
            Message<?> msg = null;
            Set<String> messageSet = Sets.newHashSet();
            for (int i = 0; i < totalProducedMessages; i++) {
                msg = consumer1.receive(300, TimeUnit.MILLISECONDS);
                if (msg != null) {
                    String receivedMessage = new String(msg.getData());
                    testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                } else {
                    break;
                }
            }
            assertEquals(messageSet.size(), totalProducedMessages);

            // (2) consume by consumer2
            messageSet.clear();
            for (int i = 0; i < totalProducedMessages; i++) {
                msg = consumer2.receive(300, TimeUnit.MILLISECONDS);
                if (msg != null) {
                    String receivedMessage = new String(msg.getData());
                    testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                } else {
                    break;
                }
            }
            assertEquals(messageSet.size(), totalProducedMessages);

            // (3) consume by repl2consumer
            messageSet.clear();
            for (int i = 0; i < totalProducedMessages; i++) {
                msg = repl2Consumer.receive(300, TimeUnit.MILLISECONDS);
                if (msg != null) {
                    String receivedMessage = new String(msg.getData());
                    testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                } else {
                    break;
                }
            }
            assertEquals(messageSet.size(), totalProducedMessages);

            // (4) consume by repl3consumer
            messageSet.clear();
            for (int i = 0; i < totalProducedMessages; i++) {
                msg = repl3Consumer.receive(300, TimeUnit.MILLISECONDS);
                if (msg != null) {
                    String receivedMessage = new String(msg.getData());
                    testMessageOrderAndDuplicates(messageSet, receivedMessage, "my-message-" + i);
                } else {
                    break;
                }
            }
            assertEquals(messageSet.size(), totalProducedMessages);

            Thread.sleep(timeWaitToSync);

            rolloverPerIntervalStats(replicationPulasr);
            stats = topicRef.getStats(false, false, false);
            subStats = stats.getSubscriptions().values().iterator().next();

            assertTrue(subStats.getMsgRateOut() > 0);
            assertEquals(subStats.getConsumers().size(), 1);
            assertTrue(subStats.getMsgThroughputOut() > 0);

            // consumer stats
            assertTrue(subStats.getConsumers().get(0).getMsgRateOut() > 0.0);
            assertTrue(subStats.getConsumers().get(0).getMsgThroughputOut() > 0.0);
            assertEquals(subStats.getMsgRateRedeliver(), 0.0);

            producer.close();
            consumer1.close();
            repl2Consumer.close();
            repl3Consumer.close();
        } finally {
            replication.shutdownReplicationCluster();
        }

    }

    /**
     * verifies load manager assigns topic only if broker started in non-persistent mode
     *
     * <pre>
     * 1. Start broker with disable non-persistent topic mode
     * 2. Create namespace with non-persistency set
     * 3. Create non-persistent topic
     * 4. Load-manager should not be able to find broker
     * 5. Create producer on that topic should fail
     * </pre>
     */
    @Test(dataProvider = "loadManager")
    public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadManagerName) throws Exception {

        final String namespace = "my-property/my-ns";
        final String topicName = "non-persistent://" + namespace + "/loadManager";
        final String defaultLoadManagerName = conf.getLoadManagerClassName();
        final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
        try {
            // start broker to not own non-persistent namespace and create non-persistent namespace
            stopBroker();
            conf.setEnableNonPersistentTopics(false);
            conf.setLoadManagerClassName(loadManagerName);
            startBroker();

            Field field = PulsarService.class.getDeclaredField("loadManager");
            field.setAccessible(true);
            @SuppressWarnings("unchecked")
            AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar);
            LoadManager manager = LoadManager.create(pulsar);
            manager.start();
            LoadManager oldLoadManager = loadManagerRef.getAndSet(manager);
            oldLoadManager.stop();

            NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
            LoadManager loadManager = pulsar.getLoadManager().get();
            ResourceUnit broker = null;
            try {
                broker = loadManager.getLeastLoaded(fdqn).get();
            } catch (Exception e) {
                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
            }
            assertNull(broker);

            try {
                Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1,
                        TimeUnit.SECONDS);
                producer.close();
                fail("topic loading should have failed");
            } catch (Exception e) {
                // Ok
            }
            assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

        } finally {
            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
            conf.setLoadManagerClassName(defaultLoadManagerName);
        }

    }

    /**
     * verifies: broker should reject non-persistent topic loading if broker is not enable for non-persistent topic
     *
     * @throws Exception
     */
    @Test
    public void testNonPersistentTopicUnderPersistentNamespace() throws Exception {

        final String namespace = "my-property/my-ns";
        final String topicName = "non-persistent://" + namespace + "/persitentNamespace";

        final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
        try {
            conf.setEnableNonPersistentTopics(false);
            stopBroker();
            startBroker();
            try {
                Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1,
                        TimeUnit.SECONDS);
                producer.close();
                fail("topic loading should have failed");
            } catch (Exception e) {
                // Ok
            }

            assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
        } finally {
            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
        }
    }

    /**
     * verifies that broker started with onlyNonPersistent mode doesn't own persistent-topic
     *
     * @param loadManagerName
     * @throws Exception
     */
    @Test(dataProvider = "loadManager")
    public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerName) throws Exception {

        final String namespace = "my-property/my-ns";
        final String topicName = "persistent://" + namespace + "/loadManager";
        final String defaultLoadManagerName = conf.getLoadManagerClassName();
        final boolean defaultEnablePersistentTopic = conf.isEnablePersistentTopics();
        final boolean defaultEnableNonPersistentTopic = conf.isEnableNonPersistentTopics();
        try {
            // start broker to not own non-persistent namespace and create non-persistent namespace
            stopBroker();
            conf.setEnableNonPersistentTopics(true);
            conf.setEnablePersistentTopics(false);
            conf.setLoadManagerClassName(loadManagerName);
            startBroker();

            Field field = PulsarService.class.getDeclaredField("loadManager");
            field.setAccessible(true);
            @SuppressWarnings("unchecked")
            AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar);
            LoadManager manager = LoadManager.create(pulsar);
            manager.start();
            LoadManager oldLoadManager = loadManagerRef.getAndSet(manager);
            oldLoadManager.stop();

            NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
            LoadManager loadManager = pulsar.getLoadManager().get();
            ResourceUnit broker = null;
            try {
                broker = loadManager.getLeastLoaded(fdqn).get();
            } catch (Exception e) {
                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
            }
            assertNull(broker);

            try {
                Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).createAsync().get(1,
                        TimeUnit.SECONDS);
                producer.close();
                fail("topic loading should have failed");
            } catch (Exception e) {
                // Ok
            }

            assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

        } finally {
            conf.setEnablePersistentTopics(defaultEnablePersistentTopic);
            conf.setEnableNonPersistentTopics(defaultEnableNonPersistentTopic);
            conf.setLoadManagerClassName(defaultLoadManagerName);
        }

    }

    /**
     * Verifies msg-drop stats
     *
     * @throws Exception
     */
    @Test
    public void testMsgDropStat() throws Exception {

        int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection();
        try {
            final String topicName = "non-persistent://my-property/my-ns/stats-topic";
            // restart broker with lower publish rate limit
            conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
            stopBroker();
            startBroker();
            Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
                    .receiverQueueSize(1).subscribe();

            Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
                    .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();

            ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();
            @Cleanup("shutdownNow")
            ExecutorService executor = Executors.newFixedThreadPool(5);
            byte[] msgData = "testData".getBytes();
            final int totalProduceMessages = 200;
            CountDownLatch latch = new CountDownLatch(totalProduceMessages);
            for (int i = 0; i < totalProduceMessages; i++) {
                executor.submit(() -> {
                    producer.sendAsync(msgData).handle((msg, e) -> {
                        latch.countDown();
                        return null;
                    });
                });
            }
            latch.await();

            NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
            pulsar.getBrokerService().updateRates();
            NonPersistentTopicStats stats = topic.getStats(false, false, false);
            NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
            NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1");
            NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2");
            assertTrue(npStats.getMsgDropRate() > 0);
            assertTrue(sub1Stats.getMsgDropRate() > 0);
            assertTrue(sub2Stats.getMsgDropRate() > 0);

            producer.close();
            consumer.close();
            consumer2.close();
        } finally {
            conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate);
        }
    }

    class ReplicationClusterManager {
        URL url1;
        PulsarService pulsar1;
        BrokerService ns1;

        PulsarAdmin admin1;
        LocalBookkeeperEnsemble bkEnsemble1;

        URL url2;
        ServiceConfiguration config2;
        PulsarService pulsar2;
        BrokerService ns2;
        PulsarAdmin admin2;
        LocalBookkeeperEnsemble bkEnsemble2;

        URL url3;
        ServiceConfiguration config3;
        PulsarService pulsar3;
        BrokerService ns3;
        PulsarAdmin admin3;
        LocalBookkeeperEnsemble bkEnsemble3;

        ZookeeperServerTest globalZkS;

        ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>());

        static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;

        // Default frequency
        public int getBrokerServicePurgeInactiveFrequency() {
            return 60;
        }

        public boolean isBrokerServicePurgeInactiveTopic() {
            return false;
        }

        void setupReplicationCluster() throws Exception {
            log.info("--- Starting ReplicatorTestBase::setup ---");
            globalZkS = new ZookeeperServerTest(0);
            globalZkS.start();

            // Start region 1
            bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
            bkEnsemble1.start();

            // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have
            // completely
            // independent config objects instead of referring to the same properties object
            ServiceConfiguration config1 = new ServiceConfiguration();
            config1.setClusterName(configClusterName);
            config1.setAdvertisedAddress("localhost");
            config1.setWebServicePort(Optional.of(0));
            config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
            config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
            config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
            config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
                    inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            config1.setBrokerShutdownTimeoutMs(0L);
            config1.setBrokerServicePort(Optional.of(0));
            config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
            config1.setAllowAutoTopicCreationType("non-partitioned");
            pulsar1 = new PulsarService(config1);
            pulsar1.start();
            ns1 = pulsar1.getBrokerService();

            url1 = new URL(pulsar1.getWebServiceAddress());
            admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();

            // Start region 2

            // Start zk & bks
            bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
            bkEnsemble2.start();

            config2 = new ServiceConfiguration();
            config2.setClusterName("r2");
            config2.setWebServicePort(Optional.of(0));
            config2.setAdvertisedAddress("localhost");
            config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
            config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
            config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
            config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
                    inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            config2.setBrokerShutdownTimeoutMs(0L);
            config2.setBrokerServicePort(Optional.of(0));
            config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
            config2.setAllowAutoTopicCreationType("non-partitioned");
            pulsar2 = new PulsarService(config2);
            pulsar2.start();
            ns2 = pulsar2.getBrokerService();

            url2 = new URL(pulsar2.getWebServiceAddress());
            admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();

            // Start region 3

            // Start zk & bks
            bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
            bkEnsemble3.start();

            config3 = new ServiceConfiguration();
            config3.setClusterName("r3");
            config3.setWebServicePort(Optional.of(0));
            config3.setAdvertisedAddress("localhost");
            config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
            config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
            config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
            config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
                    inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
            config3.setBrokerShutdownTimeoutMs(0L);
            config3.setBrokerServicePort(Optional.of(0));
            config3.setAllowAutoTopicCreationType("non-partitioned");
            pulsar3 = new PulsarService(config3);
            pulsar3.start();
            ns3 = pulsar3.getBrokerService();

            url3 = new URL(pulsar3.getWebServiceAddress());
            admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();

            // Provision the global namespace
            admin1.clusters().createCluster("r1", ClusterData.builder()
                    .serviceUrl(url1.toString())
                    .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
                    .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
                    .build());
            admin1.clusters().createCluster("r2", ClusterData.builder()
                    .serviceUrl(url2.toString())
                    .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
                    .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
                    .build());
            admin1.clusters().createCluster("r3", ClusterData.builder()
                    .serviceUrl(url3.toString())
                    .brokerServiceUrl(pulsar3.getBrokerServiceUrl())
                    .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
                    .build());

            admin1.clusters().createCluster("global", ClusterData.builder().serviceUrl("http://global:8080").build());
            admin1.tenants().createTenant("pulsar", new TenantInfoImpl(
                    Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
            admin1.namespaces().createNamespace("pulsar/global/ns");
            admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns",
                    Sets.newHashSet("r1", "r2", "r3"));

            assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
            assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
            assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
            assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
            assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
            assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
            Thread.sleep(100);
            log.info("--- ReplicatorTestBase::setup completed ---");

        }

        private int inSec(int time, TimeUnit unit) {
            return (int) TimeUnit.SECONDS.convert(time, unit);
        }

        void shutdownReplicationCluster() throws Exception {
            log.info("--- Shutting down ---");
            executor.shutdownNow();

            admin1.close();
            admin2.close();
            admin3.close();

            pulsar3.close();
            ns3.close();

            pulsar2.close();
            ns2.close();

            pulsar1.close();
            ns1.close();

            bkEnsemble1.stop();
            bkEnsemble2.stop();
            bkEnsemble3.stop();
            globalZkS.stop();
        }
    }

    private void rolloverPerIntervalStats(PulsarService pulsar) {
        try {
            pulsar.getExecutor().submit(() -> pulsar.getBrokerService().updateRates()).get();
        } catch (Exception e) {
            log.error("Stats executor error", e);
        }
    }
}
