Fix bundle metrics would overwrite loadbalance metrics (#13641)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 08dbb1e..1331c1c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -189,6 +189,8 @@
     private AtomicReference<List<Metrics>> bundleUnloadMetrics = new AtomicReference<>();
     // record bundle split metrics
     private AtomicReference<List<Metrics>> bundleSplitMetrics = new AtomicReference<>();
+    // record bundle metrics
+    private AtomicReference<List<Metrics>> bundleMetrics = new AtomicReference<>();
 
     private long bundleSplitCount = 0;
     private long unloadBrokerCount = 0;
@@ -944,7 +946,7 @@
             Map<String, String> dimensions = new HashMap<>();
             dimensions.put("broker", pulsar.getAdvertisedAddress());
             dimensions.put("bundle", bundle);
-            dimensions.put("metric", "loadBalancing");
+            dimensions.put("metric", "bundle");
             Metrics m = Metrics.create(dimensions);
             m.put("brk_bundle_msg_rate_in", stats.msgRateIn);
             m.put("brk_bundle_msg_rate_out", stats.msgRateOut);
@@ -955,7 +957,7 @@
             m.put("brk_bundle_msg_throughput_out", stats.msgThroughputOut);
             metrics.add(m);
         }
-        this.loadBalancingMetrics.set(metrics);
+        this.bundleMetrics.set(metrics);
     }
 
     /**
@@ -1117,6 +1119,10 @@
             metricsCollection.addAll(this.bundleSplitMetrics.get());
         }
 
+        if (this.bundleMetrics.get() != null) {
+            metricsCollection.addAll(this.bundleMetrics.get());
+        }
+
         return metricsCollection;
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 0b3794e..7c1cfd0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -177,6 +177,7 @@
         this.conf.setWebServicePort(Optional.of(0));
         this.conf.setWebServicePortTls(Optional.of(0));
         this.conf.setNumExecutorThreadPoolSize(5);
+        this.conf.setExposeBundlesMetricsInPrometheus(true);
     }
 
     protected final void init() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 557478b..a747453 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -57,6 +57,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
@@ -342,6 +343,52 @@
     }
 
     @Test
+    public void testBundlesMetrics() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+        }
+
+        pulsar.getBrokerService().updateRates();
+        Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
+        ModularLoadManagerWrapper loadManager = (ModularLoadManagerWrapper)pulsar.getLoadManager().get();
+        loadManager.getLoadManager().updateLocalBrokerData();
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_in"));
+        assertTrue(metrics.containsKey("pulsar_bundle_msg_rate_out"));
+        assertTrue(metrics.containsKey("pulsar_bundle_topics_count"));
+        assertTrue(metrics.containsKey("pulsar_bundle_consumer_count"));
+        assertTrue(metrics.containsKey("pulsar_bundle_producer_count"));
+        assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_in"));
+        assertTrue(metrics.containsKey("pulsar_bundle_msg_throughput_out"));
+
+        assertTrue(metrics.containsKey("pulsar_lb_cpu_usage"));
+        assertTrue(metrics.containsKey("pulsar_lb_memory_usage"));
+        assertTrue(metrics.containsKey("pulsar_lb_directMemory_usage"));
+        assertTrue(metrics.containsKey("pulsar_lb_bandwidth_in_usage"));
+        assertTrue(metrics.containsKey("pulsar_lb_bandwidth_out_usage"));
+
+        assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
+    }
+
+    @Test
     public void testPerNamespaceStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 5f8fd9e..ce048ab 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -350,7 +350,7 @@
 - cluster: cluster=${pulsar_cluster}. ${pulsar_cluster} is the cluster name that you have configured in the `broker.conf` file.
 - broker: broker=${broker}. ${broker} is the IP address of the broker
 - bundle: bundle=${bundle}. ${bundle} is the bundle range on this broker
-- metric: metric="loadBalancing".
+- metric: metric="bundle".
 
 | Name | Type | Description |
 | --- | --- | --- |