/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.stats;

import static com.google.common.base.Preconditions.checkArgument;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.base.MoreObjects;
import com.google.common.base.Splitter;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.jsonwebtoken.SignatureAlgorithm;
import io.prometheus.client.Collector;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.zookeeper.CreateMode;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class PrometheusMetricsTest extends BrokerTestBase {

    @BeforeMethod(alwaysRun = true)
    @Override
    protected void setup() throws Exception {
        super.baseSetup();
        AuthenticationProviderToken.resetMetrics();
    }

    @Override
    protected ServiceConfiguration getDefaultConf() {
        ServiceConfiguration conf = super.getDefaultConf();
        conf.setTopicLevelPoliciesEnabled(false);
        conf.setSystemTopicEnabled(false);
        // wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
        // unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
        conf.setBrokerShutdownTimeoutMs(5000L);
        return conf;
    }

    @AfterMethod(alwaysRun = true)
    @Override
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testPublishRateLimitedTimes() throws Exception {
        checkPublishRateLimitedTimes(true);
        checkPublishRateLimitedTimes(false);
    }

    private void checkPublishRateLimitedTimes(boolean preciseRateLimit) throws Exception {
        cleanup();
        if (preciseRateLimit) {
            conf.setBrokerPublisherThrottlingTickTimeMillis(10000000);
            conf.setMaxPublishRatePerTopicInMessages(1);
            conf.setMaxPublishRatePerTopicInBytes(1);
            conf.setBrokerPublisherThrottlingMaxMessageRate(100000);
            conf.setBrokerPublisherThrottlingMaxByteRate(10000000);
        } else {
            conf.setBrokerPublisherThrottlingTickTimeMillis(1);
            conf.setBrokerPublisherThrottlingMaxMessageRate(1);
            conf.setBrokerPublisherThrottlingMaxByteRate(1);
        }
        conf.setStatsUpdateFrequencyInSecs(100000000);
        conf.setPreciseTopicPublishRateLimiterEnable(preciseRateLimit);
        setup();
        String ns1 = "prop/ns-abc1" + UUID.randomUUID();
        admin.namespaces().createNamespace(ns1, 1);
        String topicName = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
        String topicName2 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
        String topicName3 = "persistent://" + ns1 + "/metrics" + UUID.randomUUID();
        // Use another connection
        @Cleanup
        PulsarClient client2 = newPulsarClient(lookupUrl.toString(), 0);

        Producer<byte[]> producer = pulsarClient.newProducer().producerName("my-pub").enableBatching(false)
                .topic(topicName).create();
        Producer<byte[]> producer2 = pulsarClient.newProducer().producerName("my-pub-2").enableBatching(false)
                .topic(topicName2).create();
        Producer<byte[]> producer3 = client2.newProducer().producerName("my-pub-2").enableBatching(false)
                .topic(topicName3).create();
        producer.sendAsync(new byte[11]);

        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
                .getTopic(topicName, false).get().get();
        Field field = AbstractTopic.class.getDeclaredField("publishRateLimitedTimes");
        field.setAccessible(true);
        Awaitility.await().untilAsserted(() -> {
            long value = (long) field.get(persistentTopic);
            assertEquals(value, 1);
        });
        @Cleanup
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        assertTrue(metrics.containsKey("pulsar_publish_rate_limit_times"));
        metrics.get("pulsar_publish_rate_limit_times").forEach(item -> {
            if (ns1.equals(item.tags.get("namespace"))) {
                if (item.tags.get("topic").equals(topicName)) {
                    assertEquals(item.value, 1);
                    return;
                } else if (item.tags.get("topic").equals(topicName2)) {
                    assertEquals(item.value, 1);
                    return;
                } else if (item.tags.get("topic").equals(topicName3)) {
                    //When using precise rate limiting, we only trigger the rate limiting of the topic,
                    // so if the topic is not using the same connection, the rate limiting times will be 0
                    //When using asynchronous rate limiting, we will trigger the broker-level rate limiting,
                    // and all connections will be limited at this time.
                    if (preciseRateLimit) {
                        assertEquals(item.value, 0);
                    } else {
                        assertEquals(item.value, 1);
                    }
                    return;
                }
                fail("should not fail");
            }
        });
        // Stats updater will reset the stats
        pulsar.getBrokerService().updateRates();
        Awaitility.await().untilAsserted(() -> {
            long value = (long) field.get(persistentTopic);
            assertEquals(value, 0);
        });

        @Cleanup
        ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2);
        String metricsStr2 = statsOut2.toString();
        Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
        assertTrue(metrics2.containsKey("pulsar_publish_rate_limit_times"));
        metrics2.get("pulsar_publish_rate_limit_times").forEach(item -> {
            if (ns1.equals(item.tags.get("namespace"))) {
                assertEquals(item.value, 0);
            }
        });

        producer.close();
        producer2.close();
        producer3.close();
    }

    @Test
    public void testMetricsTopicCount() throws Exception {
        String ns1 = "prop/ns-abc1";
        String ns2 = "prop/ns-abc2";
        admin.namespaces().createNamespace(ns1);
        admin.namespaces().createNamespace(ns2);
        String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
        String baseTopic2 = "persistent://" + ns2 + "/testMetricsTopicCount";
        for (int i = 0; i < 6; i++) {
            admin.topics().createNonPartitionedTopic(baseTopic1 + UUID.randomUUID());
        }
        for (int i = 0; i < 3; i++) {
            admin.topics().createNonPartitionedTopic(baseTopic2 + UUID.randomUUID());
        }
        Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
        @Cleanup
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        Collection<Metric> metric = metrics.get("pulsar_topics_count");
        metric.forEach(item -> {
            if (ns1.equals(item.tags.get("namespace"))) {
                assertEquals(item.value, 6.0);
            }
            if (ns2.equals(item.tags.get("namespace"))) {
                assertEquals(item.value, 3.0);
            }
        });
        Collection<Metric> topicLoadTimesMetrics = metrics.get("topic_load_times");
        Collection<Metric> topicLoadTimesCountMetrics = metrics.get("topic_load_times_count");
        assertEquals(topicLoadTimesMetrics.size(), 6);
        assertEquals(topicLoadTimesCountMetrics.size(), 1);
        Collection<Metric> pulsarTopicLoadTimesMetrics = metrics.get("pulsar_topic_load_times");
        Collection<Metric> pulsarTopicLoadTimesCountMetrics = metrics.get("pulsar_topic_load_times_count");
        assertEquals(pulsarTopicLoadTimesMetrics.size(), 6);
        assertEquals(pulsarTopicLoadTimesCountMetrics.size(), 1);
    }

    @Test
    public void testMetricsAvgMsgSize2() throws Exception {
        String ns1 = "prop/ns-abc1";
        admin.namespaces().createNamespace(ns1, 1);
        String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
        String topicName = baseTopic1 + UUID.randomUUID();
        Producer producer = pulsarClient.newProducer().producerName("my-pub")
                .topic(topicName).create();
        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
                .getTopic(topicName, false).get().get();
        org.apache.pulsar.broker.service.Producer producerInServer = persistentTopic.getProducers().get("my-pub");
        producerInServer.getStats().msgRateIn = 10;
        producerInServer.getStats().msgThroughputIn = 100;
        @Cleanup
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, true, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        assertTrue(metrics.containsKey("pulsar_average_msg_size"));
        assertEquals(metrics.get("pulsar_average_msg_size").size(), 1);
        Collection<Metric> avgMsgSizes = metrics.get("pulsar_average_msg_size");
        avgMsgSizes.forEach(item -> {
            if (ns1.equals(item.tags.get("namespace"))) {
                assertEquals(item.value, 10);
            }
        });
        producer.close();
    }

    @Test
    public void testPerTopicStats() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        Consumer<byte[]> c2 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic2")
                .subscriptionName("test")
                .subscribe();

        final int messages = 10;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
            c2.acknowledge(c2.receive());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e -> {
            System.out.println(e.getKey() + ": " + e.getValue());
        });

        // There should be 2 metrics with different tags for each topic
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_producers_count");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("topic_load_times_count");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");

        cm = (List<Metric>) metrics.get("topic_load_failed_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");

        cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("subscription"), "test");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("subscription"), "test");

        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("subscription"), "test");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("subscription"), "test");

        p1.close();
        p2.close();
        c1.close();
        c2.close();
    }

    @Test
    public void testPerBrokerStats() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        Consumer<byte[]> c2 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic2")
                .subscriptionName("test")
                .subscribe();

        final int messages = 10;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
            c2.acknowledge(c2.receive());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        Collection<Metric> brokerMetrics = metrics.get("pulsar_broker_topics_count");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_subscriptions_count");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_producers_count");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_consumers_count");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_rate_in");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_rate_out");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_throughput_in");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_throughput_out");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_storage_size");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_storage_logical_size");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_storage_write_rate");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_storage_read_rate");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        brokerMetrics = metrics.get("pulsar_broker_msg_backlog");
        assertEquals(brokerMetrics.size(), 1);
        assertEquals(brokerMetrics.stream().toList().get(0).tags.get("cluster"), "test");

        p1.close();
        p2.close();
        c1.close();
        c2.close();
    }

    /**
     * Test that the total message and byte counts for a topic are not reset when a consumer disconnects.
     *
     * @throws Exception
     */
    @Test
    public void testPerTopicStatsReconnect() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        final int messages = 5;
        final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message
        final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
        }

        c1.close();

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
        }

        Consumer<byte[]> c2 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        for (int i = 0; i < messages; i++) {
            c2.acknowledge(c2.receive());
        }

        p1.close();
        c2.close();

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e -> {
            System.out.println(e.getKey() + ": " + e.getValue());
        });

        List<Metric> cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, (messages * 2));
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("subscription"), "test");

        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, (messages * 2));
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("subscription"), "test");
    }

    @DataProvider(name = "cacheEnable")
    public static Object[][] cacheEnable() {
        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
    }

    @Test(dataProvider = "cacheEnable")
    public void testStorageReadCacheMissesRate(boolean cacheEnable) throws Exception {
        cleanup();
        conf.setManagedLedgerStatsPeriodSeconds(Integer.MAX_VALUE);
        conf.setManagedLedgerCacheEvictionTimeThresholdMillis(Long.MAX_VALUE);
        conf.setCacheEvictionByMarkDeletedPosition(true);
        if (cacheEnable) {
            conf.setManagedLedgerCacheSizeMB(1);
        } else {
            conf.setManagedLedgerCacheSizeMB(0);
        }
        setup();
        String ns = "prop/ns-abc1";
        admin.namespaces().createNamespace(ns);
        String topic = "persistent://" + ns + "/testStorageReadCacheMissesRate" + UUID.randomUUID();

        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer().enableBatching(false).topic(topic).create();
        @Cleanup
        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName("test")
                .subscribe();
        byte[] msg = new byte[2 * 1024 * 1024];
        new Random().nextBytes(msg);
        producer.send(msg);
        consumer.receive();
        // when cacheEnable, the second msg will read cache miss
        producer.send(msg);
        consumer.receive();

        PersistentTopic persistentTopic =
                (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
        ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) persistentTopic.getManagedLedger());
        managedLedger.getMbean().refreshStats(1, TimeUnit.SECONDS);

        // includeTopicMetric true
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));

        List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_read_cache_misses_rate");
        assertEquals(cm.size(), 1);
        if (cacheEnable) {
            assertEquals(cm.get(0).value, 1.0);
        } else {
            assertEquals(cm.get(0).value, 2.0);
        }

        assertEquals(cm.get(0).tags.get("topic"), topic);
        assertEquals(cm.get(0).tags.get("namespace"), ns);
        assertEquals(cm.get(0).tags.get("cluster"), "test");

        List<Metric> brokerMetric = (List<Metric>) metrics.get("pulsar_broker_storage_read_cache_misses_rate");
        assertEquals(brokerMetric.size(), 1);
        if (cacheEnable) {
            assertEquals(brokerMetric.get(0).value, 1.0);
        } else {
            assertEquals(brokerMetric.get(0).value, 2.0);
        }

        assertEquals(brokerMetric.get(0).tags.get("cluster"), "test");
        assertNull(brokerMetric.get(0).tags.get("namespace"));
        assertNull(brokerMetric.get(0).tags.get("topic"));

        // includeTopicMetric false
        ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut2);
        String metricsStr2 = statsOut2.toString();
        Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);

        metrics2.entries().forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));

        List<Metric> cm2 = (List<Metric>) metrics2.get("pulsar_storage_read_cache_misses_rate");
        assertEquals(cm2.size(), 1);
        if (cacheEnable) {
            assertEquals(cm2.get(0).value, 1.0);
        } else {
            assertEquals(cm2.get(0).value, 2.0);
        }

        assertNull(cm2.get(0).tags.get("topic"));
        assertEquals(cm2.get(0).tags.get("namespace"), ns);
        assertEquals(cm2.get(0).tags.get("cluster"), "test");

        List<Metric> brokerMetric2 = (List<Metric>) metrics.get("pulsar_broker_storage_read_cache_misses_rate");
        assertEquals(brokerMetric2.size(), 1);
        if (cacheEnable) {
            assertEquals(brokerMetric2.get(0).value, 1.0);
        } else {
            assertEquals(brokerMetric2.get(0).value, 2.0);
        }
        assertEquals(brokerMetric2.get(0).tags.get("cluster"), "test");
        assertNull(brokerMetric2.get(0).tags.get("namespace"));
        assertNull(brokerMetric2.get(0).tags.get("topic"));

        // test ManagedLedgerMetrics
        List<Metric> mlMetric = ((List<Metric>) metrics.get("pulsar_ml_ReadEntriesOpsCacheMissesRate"));
        assertEquals(mlMetric.size(), 1);
        if (cacheEnable) {
            assertEquals(mlMetric.get(0).value, 1.0);
        } else {
            assertEquals(mlMetric.get(0).value, 2.0);
        }
        assertEquals(mlMetric.get(0).tags.get("cluster"), "test");
        assertEquals(mlMetric.get(0).tags.get("namespace"), ns + "/persistent");
    }

    @Test
    public void testPerTopicExpiredStat() throws Exception {
        String ns = "prop/ns-abc1";
        admin.namespaces().createNamespace(ns);
        String topic1 = "persistent://" + ns + "/testPerTopicExpiredStat1";
        String topic2 = "persistent://" + ns + "/testPerTopicExpiredStat2";
        List<String> topicList = Arrays.asList(topic2, topic1);
        Producer<byte[]> p1 = pulsarClient.newProducer().topic(topic1).create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic(topic2).create();
        final String subName = "test";
        for (String topic : topicList) {
            pulsarClient.newConsumer()
                    .topic(topic)
                    .subscriptionName(subName)
                    .subscribe().close();
        }

        final int messages = 10;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        p1.close();
        p2.close();
        // Let the message expire
        for (String topic : topicList) {
            PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
            persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
            persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(-1);
        }
        pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
        //wait for checkMessageExpiry
        PersistentSubscription sub = (PersistentSubscription)
                pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName);
        PersistentSubscription sub2 = (PersistentSubscription)
                pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName);
        Awaitility.await().until(() -> sub.getExpiredMessageRate() != 0.0);
        Awaitility.await().until(() -> sub2.getExpiredMessageRate() != 0.0);

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        // There should be 2 metrics with different tags for each topic
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_subscription_last_expire_timestamp");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), topic2);
        assertEquals(cm.get(0).tags.get("namespace"), ns);
        assertEquals(cm.get(1).tags.get("topic"), topic1);
        assertEquals(cm.get(1).tags.get("namespace"), ns);

        //check value
        Field field = PersistentSubscription.class.getDeclaredField("lastExpireTimestamp");
        field.setAccessible(true);
        for (int i = 0; i < topicList.size(); i++) {
            PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                    .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
            assertEquals((long) field.get(subscription), (long) cm.get(i).value);
        }

        cm = (List<Metric>) metrics.get("pulsar_subscription_msg_rate_expired");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), topic2);
        assertEquals(cm.get(0).tags.get("namespace"), ns);
        assertEquals(cm.get(1).tags.get("topic"), topic1);
        assertEquals(cm.get(1).tags.get("namespace"), ns);
        //check value
        field = PersistentSubscription.class.getDeclaredField("expiryMonitor");
        field.setAccessible(true);
        NumberFormat nf = NumberFormat.getNumberInstance();
        nf.setMaximumFractionDigits(3);
        nf.setRoundingMode(RoundingMode.DOWN);
        for (int i = 0; i < topicList.size(); i++) {
            PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                    .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
            PersistentMessageExpiryMonitor monitor = (PersistentMessageExpiryMonitor) field.get(subscription);
            assertEquals(Double.valueOf(nf.format(monitor.getMessageExpiryRate())).doubleValue(), cm.get(i).value);
        }

        cm = (List<Metric>) metrics.get("pulsar_subscription_total_msg_expired");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("topic"), topic2);
        assertEquals(cm.get(0).tags.get("namespace"), ns);
        assertEquals(cm.get(1).tags.get("topic"), topic1);
        assertEquals(cm.get(1).tags.get("namespace"), ns);
        //check value
        for (int i = 0; i < topicList.size(); i++) {
            assertEquals(messages, (long) cm.get(i).value);
        }

    }

    @Test
    public void testBundlesMetrics() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        final int messages = 10;
        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
        }

        // Mock another broker to make split task work.
        String mockedBroker = "/loadbalance/brokers/127.0.0.1:0";
        mockZooKeeper.create(mockedBroker, new byte[]{0}, Collections.emptyList(), CreateMode.EPHEMERAL);

        pulsar.getBrokerService().updateRates();
        Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
        ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get();
        loadManager.getLoadManager().updateLocalBrokerData();

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_in"));
        assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_out"));
        assertTrue(metrics.containsKey("pulsar_bundle_topics_count"));
        assertTrue(metrics.containsKey("pulsar_bundle_consumer_count"));
        assertTrue(metrics.containsKey("pulsar_bundle_producer_count"));
        assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_in"));
        assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_out"));

        assertTrue(metrics.containsKey("pulsar_lb_cpu_usage"));
        assertTrue(metrics.containsKey("pulsar_lb_memory_usage"));
        assertTrue(metrics.containsKey("pulsar_lb_directMemory_usage"));
        assertTrue(metrics.containsKey("pulsar_lb_bandwidth_in_usage"));
        assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));

        assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));

        // cleanup.
        mockZooKeeper.delete(mockedBroker, 0);
    }

    @Test
    public void testNonPersistentSubMetrics() throws Exception {
        Producer<byte[]> p1 =
                pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("non-persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        final int messages = 100;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
        assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed"));
        assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out"));
        assertTrue(metrics.containsKey("pulsar_throughput_out"));
        assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver"));
        assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages"));
        assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages"));
        assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out"));
        assertTrue(metrics.containsKey("pulsar_out_bytes_total"));
        assertTrue(metrics.containsKey("pulsar_out_messages_total"));
        assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
        assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
        assertTrue(metrics.containsKey("pulsar_subscription_consumers_count"));
    }

    @Test
    public void testPerNamespaceStats() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        Consumer<byte[]> c2 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic2")
                .subscriptionName("test")
                .subscribe();

        final int messages = 10;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
            c2.acknowledge(c2.receive());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();

        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e -> {
            System.out.println(e.getKey() + ": " + e.getValue());
        });

        // There should be 1 metric aggregated per namespace
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_storage_write_latency_le_1");
        assertEquals(cm.size(), 1);
        assertNull(cm.get(0).tags.get("topic"));
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_producers_count");
        assertEquals(cm.size(), 1);
        assertNull(cm.get(0).tags.get("topic"));
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

        p1.close();
        p2.close();
        c1.close();
        c2.close();
    }

    @Test
    public void testPerProducerStats() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
                .producerName("producer1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2")
                .producerName("producer2").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("Test")
                .subscribe();

        Consumer<byte[]> c2 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic2")
                .subscriptionName("Test")
                .subscribe();

        final int messages = 10;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
            c2.acknowledge(c2.receive());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, true, statsOut);
        String metricsStr = statsOut.toString();

        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e -> {
            System.out.println(e.getKey() + ": " + e.getValue());
        });

        List<Metric> cm = (List<Metric>) metrics.get("pulsar_producer_msg_rate_in");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("producer_name"), "producer2");
        assertEquals(cm.get(0).tags.get("producer_id"), "1");

        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("producer_name"), "producer1");
        assertEquals(cm.get(1).tags.get("producer_id"), "0");

        cm = (List<Metric>) metrics.get("pulsar_producer_msg_throughput_in");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("producer_name"), "producer2");
        assertEquals(cm.get(0).tags.get("producer_id"), "1");

        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(1).tags.get("producer_name"), "producer1");
        assertEquals(cm.get(1).tags.get("producer_id"), "0");

        p1.close();
        p2.close();
        c1.close();
        c2.close();
    }

    @Test
    public void testPerConsumerStats() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();

        Consumer<byte[]> c1 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic1")
                .subscriptionName("test")
                .subscribe();

        Consumer<byte[]> c2 = pulsarClient.newConsumer()
                .topic("persistent://my-property/use/my-ns/my-topic2")
                .subscriptionName("test")
                .subscribe();

        final int messages = 10;

        for (int i = 0; i < messages; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        for (int i = 0; i < messages; i++) {
            c1.acknowledge(c1.receive());
            c2.acknowledge(c2.receive());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, true, false, statsOut);
        String metricsStr = statsOut.toString();

        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e -> {
            System.out.println(e.getKey() + ": " + e.getValue());
        });

        // There should be 1 metric aggregated per namespace
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
        assertEquals(cm.size(), 4);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("subscription"), "test");

        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(1).tags.get("subscription"), "test");
        assertEquals(cm.get(1).tags.get("consumer_id"), "1");

        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(2).tags.get("subscription"), "test");

        assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(3).tags.get("subscription"), "test");
        assertEquals(cm.get(3).tags.get("consumer_id"), "0");

        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
        assertEquals(cm.size(), 4);
        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(0).tags.get("subscription"), "test");

        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
        assertEquals(cm.get(1).tags.get("subscription"), "test");
        assertEquals(cm.get(1).tags.get("consumer_id"), "1");

        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(2).tags.get("subscription"), "test");

        assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
        assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
        assertEquals(cm.get(3).tags.get("subscription"), "test");
        assertEquals(cm.get(3).tags.get("consumer_id"), "0");

        p1.close();
        p2.close();
        c1.close();
        c2.close();
    }

    /** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser
     finds a TYPE definition for the same metric more than once, it errors out:
     https://github.com/prometheus/prometheus/blob/f04b1b5559a80a4fd1745cf891ce392a056460c9/vendor/github.com/prometheus/common/expfmt/text_parse.go#L499-L502
     This can happen when including topic metrics, since the same metric is reported multiple times with different labels. For example:

     # TYPE pulsar_subscriptions_count gauge
     pulsar_subscriptions_count{cluster="standalone"} 0
     pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/metadata"} 1.0
     pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/coordinate"} 1.0
     pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/assignments"} 1.0

     **/
    // Running the test twice to make sure types are present when generated multiple times
    @Test(invocationCount = 2)
    public void testDuplicateMetricTypeDefinitions() throws Exception {
        cleanup();
        conf.setTransactionCoordinatorEnabled(true);
        conf.setTransactionLogBatchedWriteEnabled(true);
        conf.setTransactionPendingAckBatchedWriteEnabled(true);
        setup();

        Set<String> allPrometheusSuffixString = allPrometheusSuffixEnums();
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
        for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();

        Map<String, String> typeDefs = new HashMap<>();
        Map<String, String> metricNames = new HashMap<>();

        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");

        Splitter.on("\n").split(metricsStr).forEach(line -> {
            if (line.isEmpty()) {
                return;
            }
            if (line.startsWith("#")) {
                // Check for duplicate type definitions
                Matcher typeMatcher = typePattern.matcher(line);
                checkArgument(typeMatcher.matches());
                String metricName = typeMatcher.group(1);
                String type = typeMatcher.group(2);

                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
                // "Only one TYPE line may exist for a given metric name."
                if (!typeDefs.containsKey(metricName)) {
                    typeDefs.put(metricName, type);
                } else {
                    fail("Duplicate type definition found for TYPE definition " + metricName);
                    System.out.println(metricsStr);

                }
                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
                // "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
                if (metricNames.containsKey(metricName)) {
                    System.out.println(metricsStr);
                    fail("TYPE definition for " + metricName + " appears after first sample");

                }
            } else {
                Matcher metricMatcher = metricNamePattern.matcher(line);
                checkArgument(metricMatcher.matches());
                String metricName = metricMatcher.group(1);
                metricNames.put(metricName, metricName);
            }
        });

        // Metrics with no type definition
        for (String metricName : metricNames.keySet()) {

            if (!typeDefs.containsKey(metricName)) {
                // This may be OK if this is a _sum or _count metric from a summary
                boolean isNorm = false;
                for (String suffix : allPrometheusSuffixString){
                    if (metricName.endsWith(suffix)){
                        String summaryMetricName = metricName.substring(0, metricName.indexOf(suffix));
                        if (!typeDefs.containsKey(summaryMetricName)) {
                            fail("Metric " + metricName + " does not have a corresponding summary type definition");
                        }
                        isNorm = true;
                        break;
                    }
                }
                if (!isNorm){
                    fail("Metric " + metricName + " does not have a type definition");
                }

            }
        }

        p1.close();
        p2.close();

        conf.setTransactionCoordinatorEnabled(false);
        conf.setTransactionLogBatchedWriteEnabled(false);
        conf.setTransactionPendingAckBatchedWriteEnabled(false);
    }

    /***
     * this method will return ["_sum", "_info", "_bucket", "_count", "_total", "_created", "_gsum", "_gcount"]
     */
    public static Set<String> allPrometheusSuffixEnums(){
        HashSet<String> result = new HashSet<>();
        final String metricsName = "123";
        for (Collector.Type type : Collector.Type.values()){
            Collector.MetricFamilySamples metricFamilySamples =
                    new Collector.MetricFamilySamples(metricsName, type, "", new ArrayList<>());
            result.addAll(Arrays.asList(metricFamilySamples.getNames()));
        }
        return result.stream()
                .map(str -> str.substring(metricsName.length()))
                .filter(str -> StringUtils.isNotBlank(str))
                .collect(Collectors.toSet());
    }


    @Test
    public void testManagedLedgerCacheStats() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
        for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();

        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e ->
                System.out.println(e.getKey() + ": " + e.getValue())
        );

        List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cache_evictions");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");

        cm = (List<Metric>) metrics.get("pulsar_ml_cache_hits_rate");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");

        p1.close();
        p2.close();
    }

    @Test
    public void testManagedLedgerStats() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
        Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic1").create();
        Producer<byte[]> p4 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic2").create();
        for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
            p3.send(message.getBytes());
            p4.send(message.getBytes());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();

        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e ->
                System.out.println(e.getKey() + ": " + e.getValue())
        );

        Map<String, String> typeDefs = new HashMap<>();
        Map<String, String> metricNames = new HashMap<>();

        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");

        Splitter.on("\n").split(metricsStr).forEach(line -> {
            if (line.isEmpty()) {
                return;
            }
            if (line.startsWith("#")) {
                // Check for duplicate type definitions
                Matcher typeMatcher = typePattern.matcher(line);
                checkArgument(typeMatcher.matches());
                String metricName = typeMatcher.group(1);
                String type = typeMatcher.group(2);

                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
                // "Only one TYPE line may exist for a given metric name."
                if (!typeDefs.containsKey(metricName)) {
                    typeDefs.put(metricName, type);
                } else {
                    fail("Duplicate type definition found for TYPE definition " + metricName);
                }
                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
                // "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
                if (metricNames.containsKey(metricName)) {
                    fail("TYPE definition for " + metricName + " appears after first sample");
                }
            } else {
                Matcher metricMatcher = metricNamePattern.matcher(line);
                checkArgument(metricMatcher.matches());
                String metricName = metricMatcher.group(1);
                metricNames.put(metricName, metricName);
            }
        });

        List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryBytesRate");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
        String ns = cm.get(0).tags.get("namespace");
        assertTrue(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"));

        cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
        ns = cm.get(0).tags.get("namespace");
        assertTrue(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"));

        p1.close();
        p2.close();
        p3.close();
        p4.close();
    }

    @Test
    public void testManagedLedgerBookieClientStats() throws Exception {
        @Cleanup
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();

        @Cleanup
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
        for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();

        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        metrics.entries().forEach(e ->
                System.out.println(e.getKey() + ": " + e.getValue())
        );

        List<Metric> cm = (List<Metric>) metrics.get(
                keyNameBySubstrings(metrics,
                        "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_threads"));
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");

        cm = (List<Metric>) metrics.get(
                keyNameBySubstrings(metrics,
                        "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_task_execution_sum"));
        assertEquals(cm.size(), 2);
        assertEquals(cm.get(0).tags.get("cluster"), "test");

        cm = (List<Metric>) metrics.get(
                keyNameBySubstrings(metrics,
                        "pulsar_managedLedger_client", "bookkeeper_ml_scheduler_max_queue_size"));
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
    }

    private static String keyNameBySubstrings(Multimap<String, Metric> metrics, String... substrings) {
        for (String key: metrics.keys()) {
            boolean found = true;
            for (String s: substrings) {
                if (!key.contains(s)) {
                    found = false;
                    break;
                }
            }
            if (found) {
                return key;
            }
        }
        return null;
    }

    @Test
    public void testAuthMetrics() throws IOException, AuthenticationException {
        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

        AuthenticationProviderToken provider = new AuthenticationProviderToken();

        Properties properties = new Properties();
        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));

        ServiceConfiguration conf = new ServiceConfiguration();
        conf.setProperties(properties);
        provider.initialize(conf);

        try {
            provider.authenticate(new AuthenticationDataSource() {
            });
            fail("Should have failed");
        } catch (AuthenticationException e) {
            // expected, no credential passed
        }

        String token = AuthTokenUtils.createToken(secretKey, "subject", Optional.empty());

        // Pulsar protocol auth
        String subject = provider.authenticate(new AuthenticationDataSource() {
            @Override
            public boolean hasDataFromCommand() {
                return true;
            }

            @Override
            public String getCommandData() {
                return token;
            }
        });

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_authentication_success_total");
        boolean haveSucceed = false;
        for (Metric metric : cm) {
            if (Objects.equals(metric.tags.get("auth_method"), "token")
                    && Objects.equals(metric.tags.get("provider_name"), provider.getClass().getSimpleName())) {
                haveSucceed = true;
            }
        }
        Assert.assertTrue(haveSucceed);

        cm = (List<Metric>) metrics.get("pulsar_authentication_failures_total");

        boolean haveFailed = false;
        for (Metric metric : cm) {
            if (Objects.equals(metric.tags.get("auth_method"), "token")
                    && Objects.equals(metric.tags.get("reason"),
                    AuthenticationProviderToken.ErrorCode.INVALID_AUTH_DATA.name())
                    && Objects.equals(metric.tags.get("provider_name"), provider.getClass().getSimpleName())) {
                haveFailed = true;
            }
        }
        Assert.assertTrue(haveFailed);
    }

    @Test
    public void testExpiredTokenMetrics() throws Exception {
        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

        AuthenticationProviderToken provider = new AuthenticationProviderToken();

        Properties properties = new Properties();
        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));

        ServiceConfiguration conf = new ServiceConfiguration();
        conf.setProperties(properties);
        provider.initialize(conf);

        Date expiredDate = new Date(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
        String expiredToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));

        try {
            provider.authenticate(new AuthenticationDataSource() {
                @Override
                public boolean hasDataFromCommand() {
                    return true;
                }

                @Override
                public String getCommandData() {
                    return expiredToken;
                }
            });
            fail("Should have failed");
        } catch (AuthenticationException e) {
            // expected, token was expired
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_expired_token_total");
        assertEquals(cm.size(), 1);

        provider.close();
    }

    @Test
    public void testExpiringTokenMetrics() throws Exception {
        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);

        AuthenticationProviderToken provider = new AuthenticationProviderToken();

        Properties properties = new Properties();
        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));

        ServiceConfiguration conf = new ServiceConfiguration();
        conf.setProperties(properties);
        provider.initialize(conf);

        int[] tokenRemainTime = new int[]{3, 7, 40, 100, 400};

        for (int remainTime : tokenRemainTime) {
            Date expiredDate = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(remainTime));
            String expiringToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));
            provider.authenticate(new AuthenticationDataSource() {
                @Override
                public boolean hasDataFromCommand() {
                    return true;
                }

                @Override
                public String getCommandData() {
                    return expiringToken;
                }
            });
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        Metric countMetric = ((List<Metric>) metrics.get("pulsar_expiring_token_minutes_count")).get(0);
        assertEquals(countMetric.value, tokenRemainTime.length);
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_expiring_token_minutes_bucket");
        assertEquals(cm.size(), 5);
        cm.forEach((e) -> {
            switch (e.tags.get("le")) {
                case "5.0":
                    assertEquals(e.value, 1);
                    break;
                case "10.0":
                    assertEquals(e.value, 2);
                    break;
                case "60.0":
                    assertEquals(e.value, 3);
                    break;
                case "240.0":
                    assertEquals(e.value, 4);
                    break;
                default:
                    assertEquals(e.value, 5);
                    break;
            }
        });
        provider.close();
    }

    @Test
    public void testParsingWithPositiveInfinityValue() {
        Multimap<String, Metric> metrics = parseMetrics("pulsar_broker_publish_latency{cluster=\"test\",quantile=\"0.0\"} +Inf");
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_broker_publish_latency");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
        assertEquals(cm.get(0).tags.get("quantile"), "0.0");
        assertEquals(cm.get(0).value, Double.POSITIVE_INFINITY);
    }

    @Test
    public void testParsingWithNegativeInfinityValue() {
        Multimap<String, Metric> metrics = parseMetrics("pulsar_broker_publish_latency{cluster=\"test\",quantile=\"0.0\"} -Inf");
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_broker_publish_latency");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
        assertEquals(cm.get(0).tags.get("quantile"), "0.0");
        assertEquals(cm.get(0).value, Double.NEGATIVE_INFINITY);
    }

    @Test
    public void testManagedCursorPersistStats() throws Exception {
        final String subName = "my-sub";
        final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
        final int messageSize = 10;

        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topicName)
                .subscriptionType(SubscriptionType.Shared)
                .ackTimeout(1, TimeUnit.SECONDS)
                .subscriptionName(subName)
                .subscribe();

        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topicName)
                .create();
        for (int i = 0; i < messageSize; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
            consumer.acknowledge(consumer.receive().getMessageId());
        }

        // enable ExposeManagedCursorMetricsInPrometheus
        pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(true);
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();

        Multimap<String, Metric> metrics = parseMetrics(metricsStr);

        List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cursor_persistLedgerSucceed");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
        assertEquals(cm.get(0).tags.get("cursor_name"), subName);

        // disable ExposeManagedCursorMetricsInPrometheus
        pulsar.getConfiguration().setExposeManagedCursorMetricsInPrometheus(false);
        ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut2);
        String metricsStr2 = statsOut2.toString();
        Multimap<String, Metric> metrics2 = parseMetrics(metricsStr2);
        List<Metric> cm2 = (List<Metric>) metrics2.get("pulsar_ml_cursor_persistLedgerSucceed");
        assertEquals(cm2.size(), 0);

        producer.close();
        consumer.close();
    }

    @Test
    public void testBrokerConnection() throws Exception {
        final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";

        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topicName)
                .create();

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_connection_created_total_count");
        compareBrokerConnectionStateCount(cm, 1.0);

        cm = (List<Metric>) metrics.get("pulsar_connection_create_success_count");
        compareBrokerConnectionStateCount(cm, 1.0);

        cm = (List<Metric>) metrics.get("pulsar_connection_closed_total_count");
        compareBrokerConnectionStateCount(cm, 0.0);

        cm = (List<Metric>) metrics.get("pulsar_active_connections");
        compareBrokerConnectionStateCount(cm, 1.0);

        pulsarClient.close();
        statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        metricsStr = statsOut.toString();

        metrics = parseMetrics(metricsStr);
        cm = (List<Metric>) metrics.get("pulsar_connection_closed_total_count");
        compareBrokerConnectionStateCount(cm, 1.0);

        pulsar.getConfiguration().setAuthenticationEnabled(true);

        replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString())
                .operationTimeout(1, TimeUnit.MILLISECONDS));

        try {
            pulsarClient.newProducer()
                    .topic(topicName)
                    .create();
            fail();
        } catch (Exception e) {
            assertTrue(e instanceof PulsarClientException.AuthenticationException);
        }

        pulsarClient.close();
        statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        metricsStr = statsOut.toString();

        metrics = parseMetrics(metricsStr);
        cm = (List<Metric>) metrics.get("pulsar_connection_closed_total_count");
        compareBrokerConnectionStateCount(cm, 2.0);

        cm = (List<Metric>) metrics.get("pulsar_connection_create_fail_count");
        compareBrokerConnectionStateCount(cm, 1.0);

        cm = (List<Metric>) metrics.get("pulsar_connection_create_success_count");
        compareBrokerConnectionStateCount(cm, 1.0);

        cm = (List<Metric>) metrics.get("pulsar_active_connections");
        compareBrokerConnectionStateCount(cm, 0.0);

        cm = (List<Metric>) metrics.get("pulsar_connection_created_total_count");
        compareBrokerConnectionStateCount(cm, 2.0);
    }

    private void compareBrokerConnectionStateCount(List<Metric> cm, double count) {
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
        assertEquals(cm.get(0).tags.get("broker"), "localhost");
        assertEquals(cm.get(0).value, count);
    }

    @Test
    void testParseMetrics() throws IOException {
        String sampleMetrics = IOUtils.toString(getClass().getClassLoader()
                .getResourceAsStream("prometheus_metrics_sample.txt"), StandardCharsets.UTF_8);
        parseMetrics(sampleMetrics);
    }

    @Test
    public void testCompaction() throws Exception {
        final String topicName = "persistent://my-namespace/use/my-ns/my-compaction1";

        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topicName)
                .enableBatching(false)
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .create();
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        List<Metric> cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
        assertEquals(cm.size(), 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
        assertEquals(cm.size(), 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
        assertEquals(cm.size(), 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
        assertEquals(cm.size(), 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
        assertEquals(cm.size(), 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
        assertEquals(cm.size(), 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
        assertEquals(cm.size(), 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
        assertEquals(cm.size(), 0);
        //
        final int numMessages = 1000;
        final int maxKeys = 10;
        Random r = new Random(0);
        for (int j = 0; j < numMessages; j++) {
            int keyIndex = r.nextInt(maxKeys);
            String key = "key"+keyIndex;
            byte[] data = ("my-message-" + key + "-" + j).getBytes();
            producer.newMessage()
                    .key(key)
                    .value(data)
                    .send();
        }
        Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
        compactor.compact(topicName).get();
        statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
        metricsStr = statsOut.toString();
        metrics = parseMetrics(metricsStr);
        cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, 990);
        cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, 1);
        cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
        assertEquals(cm.size(), 1);
        assertTrue(cm.get(0).value > 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
        assertEquals(cm.size(), 1);
        assertTrue(cm.get(0).value > 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
        assertEquals(cm.size(), 1);
        assertTrue(cm.get(0).value > 0);
        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, 10);
        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).value, 840);

        pulsarClient.close();
    }

    @Test
    public void testMetricsWithCache() throws Throwable {
        ServiceConfiguration configuration = Mockito.mock(ServiceConfiguration.class);
        Mockito.when(configuration.getManagedLedgerStatsPeriodSeconds()).thenReturn(2);
        Mockito.when(configuration.isMetricsBufferResponse()).thenReturn(true);
        Mockito.when(configuration.getClusterName()).thenReturn(configClusterName);
        Mockito.when(pulsar.getConfiguration()).thenReturn(configuration);

        int period = pulsar.getConfiguration().getManagedLedgerStatsPeriodSeconds();
        TimeWindow<Object> timeWindow = new TimeWindow<>(2, (int) TimeUnit.SECONDS.toMillis(period));

        for (int a = 0; a < 4; a++) {
            long start = System.currentTimeMillis();
            ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
            PrometheusMetricsGenerator.generate(pulsar, true, false, false, false, statsOut1, null);
            ByteArrayOutputStream statsOut2 = new ByteArrayOutputStream();
            PrometheusMetricsGenerator.generate(pulsar, true, false, false, false, statsOut2, null);
            long end = System.currentTimeMillis();

            if (timeWindow.currentWindowStart(start) == timeWindow.currentWindowStart(end)) {
                String metricsStr1 = statsOut1.toString();
                String metricsStr2 = statsOut2.toString();
                assertEquals(metricsStr1, metricsStr2);
                Multimap<String, Metric> metrics = parseMetrics(metricsStr1);
            }

            Thread.sleep(TimeUnit.SECONDS.toMillis(period / 2));
        }
    }

    @Test
    public void testSplitTopicAndPartitionLabel() throws Exception {
        String ns1 = "prop/ns-abc1";
        String ns2 = "prop/ns-abc2";
        admin.namespaces().createNamespace(ns1);
        admin.namespaces().createNamespace(ns2);
        String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
        String baseTopic2 = "persistent://" + ns2 + "/testMetricsTopicCount";
        for (int i = 0; i < 6; i++) {
            admin.topics().createNonPartitionedTopic(baseTopic1 + UUID.randomUUID());
        }
        for (int i = 0; i < 3; i++) {
            admin.topics().createPartitionedTopic(baseTopic2 + UUID.randomUUID(), 3);
        }
        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                .topicsPattern("persistent://" + ns1 + "/.*")
                .subscriptionName("sub")
                .subscribe();
        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
                .topicsPattern("persistent://" + ns2 + "/.*")
                .subscriptionName("sub")
                .subscribe();
        @Cleanup
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false, false, true,  statsOut);
        String metricsStr = statsOut.toString();
        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
        Collection<Metric> metric = metrics.get("pulsar_consumers_count");
        assertTrue(metric.size() >= 15);
        metric.forEach(item -> {
            if (ns1.equals(item.tags.get("namespace"))) {
                assertEquals(item.tags.get("partition"), "-1");
            }
            if (ns2.equals(item.tags.get("namespace"))) {
                System.out.println(item);
                assertTrue(Integer.parseInt(item.tags.get("partition")) >= 0);
            }
        });
        consumer1.close();
        consumer2.close();
    }

    private void compareCompactionStateCount(List<Metric> cm, double count) {
        assertEquals(cm.size(), 1);
        assertEquals(cm.get(0).tags.get("cluster"), "test");
        assertEquals(cm.get(0).tags.get("broker"), "localhost");
        assertEquals(cm.get(0).value, count);
    }

    @Test
    public void testMetricsGroupedByTypeDefinitions() throws Exception {
        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
        for (int i = 0; i < 10; i++) {
            String message = "my-message-" + i;
            p1.send(message.getBytes());
            p2.send(message.getBytes());
        }

        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
        String metricsStr = statsOut.toString();

        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");

        AtomicReference<String> currentMetric = new AtomicReference<>();
        Splitter.on("\n").split(metricsStr).forEach(line -> {
            if (line.isEmpty()) {
                return;
            }
            if (line.startsWith("#")) {
                // Get the current type definition
                Matcher typeMatcher = typePattern.matcher(line);
                checkArgument(typeMatcher.matches());
                String metricName = typeMatcher.group(1);
                currentMetric.set(metricName);
            } else {
                Matcher metricMatcher = metricNamePattern.matcher(line);
                checkArgument(metricMatcher.matches());
                String metricName = metricMatcher.group(1);

                if (metricName.endsWith("_bucket")) {
                    metricName = metricName.substring(0, metricName.indexOf("_bucket"));
                } else if (metricName.endsWith("_count") && !currentMetric.get().endsWith("_count")) {
                    metricName = metricName.substring(0, metricName.indexOf("_count"));
                } else if (metricName.endsWith("_sum") && !currentMetric.get().endsWith("_sum")) {
                    metricName = metricName.substring(0, metricName.indexOf("_sum"));
                } else if (metricName.endsWith("_total") && !currentMetric.get().endsWith("_total")) {
                    metricName = metricName.substring(0, metricName.indexOf("_total"));
                } else if (metricName.endsWith("_created") && !currentMetric.get().endsWith("_created")) {
                    metricName = metricName.substring(0, metricName.indexOf("_created"));
                }

                if (!metricName.equals(currentMetric.get())) {
                    System.out.println(metricsStr);
                    fail("Metric not grouped under its type definition: " + line);
                }

            }
        });

        p1.close();
        p2.close();
    }

    /**
     * Hacky parsing of Prometheus text format. Should be good enough for unit tests
     */
    public static Multimap<String, Metric> parseMetrics(String metrics) {
        Multimap<String, Metric> parsed = ArrayListMultimap.create();

        // Example of lines are
        // jvm_threads_current{cluster="standalone",} 203.0
        // or
        // pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
        // topic="persistent://public/default/test-2"} 0.0
        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)$");
        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");

        Splitter.on("\n").split(metrics).forEach(line -> {
            if (line.isEmpty() || line.startsWith("#")) {
                return;
            }

            Matcher matcher = pattern.matcher(line);
            assertTrue(matcher.matches(), "line " + line + " does not match pattern " + pattern);
            String name = matcher.group(1);

            Metric m = new Metric();
            String numericValue = matcher.group(3);
            if (numericValue.equalsIgnoreCase("-Inf")) {
                m.value = Double.NEGATIVE_INFINITY;
            } else if (numericValue.equalsIgnoreCase("+Inf")) {
                m.value = Double.POSITIVE_INFINITY;
            } else {
                m.value = Double.parseDouble(numericValue);
            }
            String tags = matcher.group(2);
            Matcher tagsMatcher = tagsPattern.matcher(tags);
            while (tagsMatcher.find()) {
                String tag = tagsMatcher.group(1);
                String value = tagsMatcher.group(2);
                m.tags.put(tag, value);
            }

            parsed.put(name, m);
        });

        return parsed;
    }

    public static class Metric {
        public Map<String, String> tags = new TreeMap<>();
        public double value;

        @Override
        public String toString() {
            return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString();
        }
    }

    @Test
    public void testEscapeLabelValue() throws Exception {
        String ns1 = "prop/ns-abc1";
        admin.namespaces().createNamespace(ns1);
        String topic = "persistent://" + ns1 + "/\"mytopic";
        admin.topics().createNonPartitionedTopic(topic);

        @Cleanup
        final Consumer<?> consumer = pulsarClient.newConsumer()
                .subscriptionName("sub")
                .topic(topic)
                .subscribe();
        @Cleanup
        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
        PrometheusMetricsGenerator.generate(pulsar, true, false,
                false, statsOut);
        String metricsStr = statsOut.toString();
        final List<String> subCountLines = metricsStr.lines()
                .filter(line -> line.startsWith("pulsar_subscriptions_count"))
                .collect(Collectors.toList());
        System.out.println(subCountLines);
        assertEquals(subCountLines.size(), 1);
        assertEquals(subCountLines.get(0),
                "pulsar_subscriptions_count{cluster=\"test\",namespace=\"prop/ns-abc1\",topic=\"persistent://prop/ns-abc1/\\\"mytopic\"} 1");
    }

}
