Fix bundle metrics would overwrite loadbalance metrics (#13641)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 08dbb1e..1331c1c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -189,6 +189,8 @@
private AtomicReference<List<Metrics>> bundleUnloadMetrics = new AtomicReference<>();
// record bundle split metrics
private AtomicReference<List<Metrics>> bundleSplitMetrics = new AtomicReference<>();
+ // record bundle metrics
+ private AtomicReference<List<Metrics>> bundleMetrics = new AtomicReference<>();
private long bundleSplitCount = 0;
private long unloadBrokerCount = 0;
@@ -944,7 +946,7 @@
Map<String, String> dimensions = new HashMap<>();
dimensions.put("broker", pulsar.getAdvertisedAddress());
dimensions.put("bundle", bundle);
- dimensions.put("metric", "loadBalancing");
+ dimensions.put("metric", "bundle");
Metrics m = Metrics.create(dimensions);
m.put("brk_bundle_msg_rate_in", stats.msgRateIn);
m.put("brk_bundle_msg_rate_out", stats.msgRateOut);
@@ -955,7 +957,7 @@
m.put("brk_bundle_msg_throughput_out", stats.msgThroughputOut);
metrics.add(m);
}
- this.loadBalancingMetrics.set(metrics);
+ this.bundleMetrics.set(metrics);
}
/**
@@ -1117,6 +1119,10 @@
metricsCollection.addAll(this.bundleSplitMetrics.get());
}
+ if (this.bundleMetrics.get() != null) {
+ metricsCollection.addAll(this.bundleMetrics.get());
+ }
+
return metricsCollection;
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 0b3794e..7c1cfd0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -177,6 +177,7 @@
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
+ this.conf.setExposeBundlesMetricsInPrometheus(true);
}
protected final void init() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 557478b..a747453 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -57,6 +57,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
@@ -342,6 +343,52 @@
}
@Test
+ public void testBundlesMetrics() throws Exception {
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+
+ Consumer<byte[]> c1 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("test")
+ .subscribe();
+
+ final int messages = 10;
+
+ for (int i = 0; i < messages; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ c1.acknowledge(c1.receive());
+ }
+
+ pulsar.getBrokerService().updateRates();
+ Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
+ ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get();
+ loadManager.getLoadManager().updateLocalBrokerData();
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+ assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_in"));
+ assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_out"));
+ assertTrue(metrics.containsKey("pulsar_bundle_topics_count"));
+ assertTrue(metrics.containsKey("pulsar_bundle_consumer_count"));
+ assertTrue(metrics.containsKey("pulsar_bundle_producer_count"));
+ assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_in"));
+ assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_out"));
+
+ assertTrue(metrics.containsKey("pulsar_lb_cpu_usage"));
+ assertTrue(metrics.containsKey("pulsar_lb_memory_usage"));
+ assertTrue(metrics.containsKey("pulsar_lb_directMemory_usage"));
+ assertTrue(metrics.containsKey("pulsar_lb_bandwidth_in_usage"));
+ assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
+
+ assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
+ }
+
+ @Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 5f8fd9e..ce048ab 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -350,7 +350,7 @@
- cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you have configured in the `broker.conf` file.
- broker: broker=${broker}. ${broker} is the IP address of the broker
- bundle: bundle=${bundle}. ${bundle} is the bundle range on this broker
-- metric: metric="loadBalancing".
+- metric: metric="bundle".
| Name | Type | Description |
| --- | --- | --- |