[improve][broker] Skip unloading when bundle throughput is zero (ExtensibleLoadManagerImpl only) (#23626)

(cherry picked from commit e8657e2b94951b0b98797a6e1d943113121b1e53)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
index ec26521..9c6e963 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
@@ -72,6 +72,12 @@
                     pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
             for (var etr : bundleStats.entrySet()) {
                 String bundle = etr.getKey();
+                var stat = etr.getValue();
+
+                // skip zero traffic bundles
+                if (stat.msgThroughputIn + stat.msgThroughputOut == 0) {
+                    continue;
+                }
                 // TODO: do not filter system topic while shedding
                 if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
                     continue;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7126ccb..72d671a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -528,6 +528,13 @@
 
                     var bundleData = e.stats();
                     double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
+                    if (maxBrokerBundleThroughput == 0) {
+                        if (debugMode) {
+                            log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+                                    + " It has zero throughput.", bundle));
+                        }
+                        continue;
+                    }
                     boolean swap = false;
                     List<Unload> minToMaxUnloads = new ArrayList<>();
                     double minBrokerBundleSwapThroughput = 0.0;
@@ -549,6 +556,9 @@
                                 var minBrokerBundleThroughput =
                                         minBrokerBundleData.stats().msgThroughputIn
                                                 + minBrokerBundleData.stats().msgThroughputOut;
+                                if (minBrokerBundleThroughput == 0) {
+                                    continue;
+                                }
                                 var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput;
                                 var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput;
                                 if (maxBrokerNewThroughputTmp < maxBrokerThroughput
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
index 472d44d..3445ab3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
@@ -88,14 +88,17 @@
         var topKBundles = new TopKBundles(pulsar);
         NamespaceBundleStats stats1 = new NamespaceBundleStats();
         stats1.msgRateIn = 100000;
+        stats1.msgThroughputOut = 10;
         bundleStats.put(bundle1, stats1);
 
         NamespaceBundleStats stats2 = new NamespaceBundleStats();
         stats2.msgRateIn = 500;
+        stats2.msgThroughputOut = 10;
         bundleStats.put(bundle2, stats2);
 
         NamespaceBundleStats stats3 = new NamespaceBundleStats();
         stats3.msgRateIn = 10000;
+        stats3.msgThroughputOut = 10;
         bundleStats.put(bundle3, stats3);
 
         NamespaceBundleStats stats4 = new NamespaceBundleStats();
@@ -118,10 +121,12 @@
         var topKBundles = new TopKBundles(pulsar);
         NamespaceBundleStats stats1 = new NamespaceBundleStats();
         stats1.msgRateIn = 500;
+        stats1.msgThroughputOut = 10;
         bundleStats.put("pulsar/system/0x00000000_0x0FFFFFFF", stats1);
 
         NamespaceBundleStats stats2 = new NamespaceBundleStats();
         stats2.msgRateIn = 10000;
+        stats2.msgThroughputOut = 10;
         bundleStats.put(bundle1, stats2);
 
         topKBundles.update(bundleStats, 2);
@@ -131,6 +136,21 @@
         assertEquals(top0.bundleName(), bundle1);
     }
 
+    @Test
+    public void testZeroMsgThroughputBundleStats() {
+        Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
+        var topKBundles = new TopKBundles(pulsar);
+        NamespaceBundleStats stats1 = new NamespaceBundleStats();
+        bundleStats.put(bundle1, stats1);
+
+        NamespaceBundleStats stats2 = new NamespaceBundleStats();
+        bundleStats.put(bundle1, stats2);
+
+        topKBundles.update(bundleStats, 2);
+
+        assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 0);
+    }
+
 
     private  void setAntiAffinityGroup() throws MetadataStoreException {
         LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup");
@@ -166,10 +186,12 @@
         var topKBundles = new TopKBundles(pulsar);
         NamespaceBundleStats stats1 = new NamespaceBundleStats();
         stats1.msgRateIn = 500;
+        stats1.msgThroughputOut = 10;
         bundleStats.put(bundle1, stats1);
 
         NamespaceBundleStats stats2 = new NamespaceBundleStats();
         stats2.msgRateIn = 10000;
+        stats2.msgThroughputOut = 10;
         bundleStats.put(bundle2, stats2);
 
         topKBundles.update(bundleStats, 2);
@@ -188,10 +210,12 @@
         var topKBundles = new TopKBundles(pulsar);
         NamespaceBundleStats stats1 = new NamespaceBundleStats();
         stats1.msgRateIn = 500;
+        stats1.msgThroughputOut = 10;
         bundleStats.put(bundle1, stats1);
 
         NamespaceBundleStats stats2 = new NamespaceBundleStats();
         stats2.msgRateIn = 10000;
+        stats2.msgThroughputOut = 10;
         bundleStats.put(bundle2, stats2);
 
         topKBundles.update(bundleStats, 2);
@@ -213,10 +237,12 @@
         var topKBundles = new TopKBundles(pulsar);
         NamespaceBundleStats stats1 = new NamespaceBundleStats();
         stats1.msgRateIn = 500;
+        stats1.msgThroughputOut = 10;
         bundleStats.put(bundle1, stats1);
 
         NamespaceBundleStats stats2 = new NamespaceBundleStats();
         stats2.msgRateIn = 10000;
+        stats2.msgThroughputOut = 10;
         bundleStats.put(bundle2, stats2);
 
         topKBundles.update(bundleStats, 2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 48bef15..5e20b19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -919,6 +919,26 @@
         assertEquals(counter.getLoadStd(), setupLoadStd);
     }
 
+    @Test
+    public void testZeroBundleThroughput() {
+        UnloadCounter counter = new UnloadCounter();
+        TransferShedder transferShedder = new TransferShedder(counter);
+        var ctx = setupContext();
+        var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+        for (var e : topBundlesLoadDataStore.entrySet()) {
+            for (var stat : e.getValue().getTopBundlesLoadData()) {
+                stat.stats().msgThroughputOut = 0;
+                stat.stats().msgThroughputIn = 0;
+
+            }
+        }
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+        assertTrue(res.isEmpty());
+        assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
+        assertEquals(counter.getLoadAvg(), setupLoadAvg);
+        assertEquals(counter.getLoadStd(), setupLoadStd);
+    }
+
 
     @Test
     public void testTargetStdAfterTransfer() {