[improve][broker] Skip unloading when bundle throughput is zero (ExtensibleLoadManagerImpl only) (#23626)
(cherry picked from commit e8657e2b94951b0b98797a6e1d943113121b1e53)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
index ec26521..9c6e963 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
@@ -72,6 +72,12 @@
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
+ var stat = etr.getValue();
+
+ // skip zero traffic bundles
+ if (stat.msgThroughputIn + stat.msgThroughputOut == 0) {
+ continue;
+ }
// TODO: do not filter system topic while shedding
if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) {
continue;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7126ccb..72d671a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -528,6 +528,13 @@
var bundleData = e.stats();
double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
+ if (maxBrokerBundleThroughput == 0) {
+ if (debugMode) {
+ log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ + " It has zero throughput.", bundle));
+ }
+ continue;
+ }
boolean swap = false;
List<Unload> minToMaxUnloads = new ArrayList<>();
double minBrokerBundleSwapThroughput = 0.0;
@@ -549,6 +556,9 @@
var minBrokerBundleThroughput =
minBrokerBundleData.stats().msgThroughputIn
+ minBrokerBundleData.stats().msgThroughputOut;
+ if (minBrokerBundleThroughput == 0) {
+ continue;
+ }
var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput;
var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput;
if (maxBrokerNewThroughputTmp < maxBrokerThroughput
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
index 472d44d..3445ab3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
@@ -88,14 +88,17 @@
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 100000;
+ stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);
NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 500;
+ stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);
NamespaceBundleStats stats3 = new NamespaceBundleStats();
stats3.msgRateIn = 10000;
+ stats3.msgThroughputOut = 10;
bundleStats.put(bundle3, stats3);
NamespaceBundleStats stats4 = new NamespaceBundleStats();
@@ -118,10 +121,12 @@
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
+ stats1.msgThroughputOut = 10;
bundleStats.put("pulsar/system/0x00000000_0x0FFFFFFF", stats1);
NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
+ stats2.msgThroughputOut = 10;
bundleStats.put(bundle1, stats2);
topKBundles.update(bundleStats, 2);
@@ -131,6 +136,21 @@
assertEquals(top0.bundleName(), bundle1);
}
+ @Test
+ public void testZeroMsgThroughputBundleStats() {
+ Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
+ var topKBundles = new TopKBundles(pulsar);
+ NamespaceBundleStats stats1 = new NamespaceBundleStats();
+ bundleStats.put(bundle1, stats1);
+
+ NamespaceBundleStats stats2 = new NamespaceBundleStats();
+ bundleStats.put(bundle1, stats2);
+
+ topKBundles.update(bundleStats, 2);
+
+ assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 0);
+ }
+
private void setAntiAffinityGroup() throws MetadataStoreException {
LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup");
@@ -166,10 +186,12 @@
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
+ stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);
NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
+ stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);
topKBundles.update(bundleStats, 2);
@@ -188,10 +210,12 @@
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
+ stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);
NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
+ stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);
topKBundles.update(bundleStats, 2);
@@ -213,10 +237,12 @@
var topKBundles = new TopKBundles(pulsar);
NamespaceBundleStats stats1 = new NamespaceBundleStats();
stats1.msgRateIn = 500;
+ stats1.msgThroughputOut = 10;
bundleStats.put(bundle1, stats1);
NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats2.msgRateIn = 10000;
+ stats2.msgThroughputOut = 10;
bundleStats.put(bundle2, stats2);
topKBundles.update(bundleStats, 2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 48bef15..5e20b19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -919,6 +919,26 @@
assertEquals(counter.getLoadStd(), setupLoadStd);
}
+ @Test
+ public void testZeroBundleThroughput() {
+ UnloadCounter counter = new UnloadCounter();
+ TransferShedder transferShedder = new TransferShedder(counter);
+ var ctx = setupContext();
+ var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+ for (var e : topBundlesLoadDataStore.entrySet()) {
+ for (var stat : e.getValue().getTopBundlesLoadData()) {
+ stat.stats().msgThroughputOut = 0;
+ stat.stats().msgThroughputIn = 0;
+
+ }
+ }
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+ assertTrue(res.isEmpty());
+ assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
+ assertEquals(counter.getLoadAvg(), setupLoadAvg);
+ assertEquals(counter.getLoadStd(), setupLoadStd);
+ }
+
@Test
public void testTargetStdAfterTransfer() {