[Transaction] Duplicate TYPE of Prometheus metrics (#13704)
* [Transaction] Fix Duplicate TYPE of Prometheus metrics
### Motivation
Fix duplicate TYPE statements in the metrics output which leads to parsing of Prometheus metrics.
### Modification
Add a map in TransactionAggregator.java, which used for tracking duplicate TYPE definitions.
* Add test
* Fix test and checkstyle
* Use threadLocal
* Use threadLocal
* Fix test and details/*8
* Fix details
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 65399d4..e6ac153 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -20,6 +20,8 @@
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashMap;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
@@ -36,6 +38,17 @@
@Slf4j
public class TransactionAggregator {
+ /**
+ * Used for tracking duplicate TYPE definitions.
+ */
+ private static final FastThreadLocal<Map<String, String>> threadLocalMetricWithTypeDefinition =
+ new FastThreadLocal() {
+ @Override
+ protected Map<String, String> initialValue() {
+ return new HashMap<>();
+ }
+ };
+
private static final FastThreadLocal<AggregatedTransactionCoordinatorStats> localTransactionCoordinatorStats =
new FastThreadLocal<AggregatedTransactionCoordinatorStats>() {
@Override
@@ -54,6 +67,8 @@
public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) {
String cluster = pulsar.getConfiguration().getClusterName();
+ Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+ metricWithTypeDefinition.clear();
if (includeTopicMetrics) {
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
@@ -146,10 +161,19 @@
subscription, managedLedgerStats);
}
+ private static void metricType(SimpleTextOutputStream stream, String name) {
+ Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+ if (!metricWithTypeDefinition.containsKey(name)) {
+ metricWithTypeDefinition.put(name, "gauge");
+ stream.write("# TYPE ").write(name).write(" gauge\n");
+ }
+
+ }
+
private static void metric(SimpleTextOutputStream stream, String cluster, String name,
double value, long coordinatorId) {
- stream.write("# TYPE ").write(name).write(" gauge\n")
- .write(name)
+ metricType(stream, name);
+ stream.write(name)
.write("{cluster=\"").write(cluster)
.write("\",coordinator_id=\"").write(coordinatorId).write("\"} ")
.write(value).write(' ').write(System.currentTimeMillis())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 6a4b5c4..7c48e81 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -18,8 +18,14 @@
*/
package org.apache.pulsar.broker.stats;
+import com.google.common.base.Splitter;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -51,10 +57,13 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+@Slf4j
public class TransactionMetricsTest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@@ -339,6 +348,72 @@
checkManagedLedgerMetrics(subName2, 32, metric);
}
+ @Test
+ public void testDuplicateMetricTypeDefinitions() throws Exception{
+ admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+ TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
+ TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get(1);
+ pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
+ pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDTwo);
+
+ Awaitility.await().until(() ->
+ pulsar.getTransactionMetadataStoreService().getStores().size() == 2);
+ pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+ Producer<byte[]> p1 = pulsarClient
+ .newProducer()
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+ Transaction transaction = pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.newMessage(transaction)
+ .value(message.getBytes())
+ .send();
+ }
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+
+ Map<String, String> typeDefs = new HashMap<>();
+ Map<String, String> metricNames = new HashMap<>();
+ Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+
+ Splitter.on("\n").split(metricsStr).forEach(line -> {
+ if (line.isEmpty()) {
+ return;
+ }
+ if (line.startsWith("#")) {
+ // Check for duplicate type definitions
+ Matcher typeMatcher = typePattern.matcher(line);
+ checkArgument(typeMatcher.matches());
+ String metricName = typeMatcher.group(1);
+ String type = typeMatcher.group(2);
+ // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+ // "Only one TYPE line may exist for a given metric name."
+ if (!typeDefs.containsKey(metricName)) {
+ typeDefs.put(metricName, type);
+ } else {
+ log.warn(metricsStr);
+ fail("Duplicate type definition found for TYPE definition " + metricName);
+
+ }
+ // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+ // "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
+ if (metricNames.containsKey(metricName)) {
+ log.info(metricsStr);
+ fail("TYPE definition for " + metricName + " appears after first sample");
+
+ }
+ }
+ });
+
+ }
+
private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
boolean exist = false;
for (PrometheusMetricsTest.Metric metric1 : metrics) {