[Transaction] Duplicate TYPE of Prometheus metrics (#13704)

* [Transaction] Fix Duplicate TYPE of Prometheus metrics
### Motivation
Fix duplicate TYPE statements in the metrics output which leads to parsing of Prometheus metrics.
### Modification
Add a map in TransactionAggregator.java, which used for tracking duplicate TYPE definitions.

* Add test

* Fix test and checkstyle

* Use threadLocal

* Use threadLocal

* Fix test and details/*8

* Fix details
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 65399d4..e6ac153 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -20,6 +20,8 @@
 
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
@@ -36,6 +38,17 @@
 @Slf4j
 public class TransactionAggregator {
 
+    /**
+     * Used for tracking duplicate TYPE definitions.
+     */
+    private static final FastThreadLocal<Map<String, String>> threadLocalMetricWithTypeDefinition =
+            new FastThreadLocal() {
+                @Override
+                protected Map<String, String> initialValue() {
+                    return new HashMap<>();
+                }
+             };
+
     private static final FastThreadLocal<AggregatedTransactionCoordinatorStats> localTransactionCoordinatorStats =
             new FastThreadLocal<AggregatedTransactionCoordinatorStats>() {
                 @Override
@@ -54,6 +67,8 @@
 
     public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) {
         String cluster = pulsar.getConfiguration().getClusterName();
+        Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+        metricWithTypeDefinition.clear();
 
         if (includeTopicMetrics) {
             pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
@@ -146,10 +161,19 @@
                 subscription, managedLedgerStats);
     }
 
+    private static void metricType(SimpleTextOutputStream stream, String name) {
+        Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+        if (!metricWithTypeDefinition.containsKey(name)) {
+            metricWithTypeDefinition.put(name, "gauge");
+            stream.write("# TYPE ").write(name).write(" gauge\n");
+        }
+
+    }
+
     private static void metric(SimpleTextOutputStream stream, String cluster, String name,
                                double value, long coordinatorId) {
-        stream.write("# TYPE ").write(name).write(" gauge\n")
-                .write(name)
+        metricType(stream, name);
+        stream.write(name)
                 .write("{cluster=\"").write(cluster)
                 .write("\",coordinator_id=\"").write(coordinatorId).write("\"} ")
                 .write(value).write(' ').write(System.currentTimeMillis())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 6a4b5c4..7c48e81 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -18,8 +18,14 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -51,10 +57,13 @@
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
+@Slf4j
 public class TransactionMetricsTest extends BrokerTestBase {
 
     @BeforeMethod(alwaysRun = true)
@@ -339,6 +348,72 @@
         checkManagedLedgerMetrics(subName2, 32, metric);
     }
 
+    @Test
+    public void testDuplicateMetricTypeDefinitions() throws Exception{
+        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
+        TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get(1);
+        pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
+        pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDTwo);
+
+        Awaitility.await().until(() ->
+                pulsar.getTransactionMetadataStoreService().getStores().size() == 2);
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+        Producer<byte[]> p1 = pulsarClient
+                .newProducer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+        Transaction transaction = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.newMessage(transaction)
+                    .value(message.getBytes())
+                    .send();
+        }
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+
+        Map<String, String> typeDefs = new HashMap<>();
+        Map<String, String> metricNames = new HashMap<>();
+        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+
+        Splitter.on("\n").split(metricsStr).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+            if (line.startsWith("#")) {
+                // Check for duplicate type definitions
+                Matcher typeMatcher = typePattern.matcher(line);
+                checkArgument(typeMatcher.matches());
+                String metricName = typeMatcher.group(1);
+                String type = typeMatcher.group(2);
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "Only one TYPE line may exist for a given metric name."
+                if (!typeDefs.containsKey(metricName)) {
+                    typeDefs.put(metricName, type);
+                } else {
+                    log.warn(metricsStr);
+                    fail("Duplicate type definition found for TYPE definition " + metricName);
+
+                }
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
+                if (metricNames.containsKey(metricName)) {
+                    log.info(metricsStr);
+                    fail("TYPE definition for " + metricName + " appears after first sample");
+
+                }
+            }
+        });
+
+    }
+
     private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
         boolean exist = false;
         for (PrometheusMetricsTest.Metric metric1 : metrics) {