[fix][broker] Make ExtensibleLoadManagerImpl.getOwnedServiceUnits async (#22727)
(cherry picked from commit fd5916cca6ee2041efa3947d19910e16d94d1bee)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 26ee45b..855e8cc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -195,13 +195,14 @@
/**
* Get all the bundles that are owned by this broker.
*/
- public Set<NamespaceBundle> getOwnedServiceUnits() {
+ public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
if (!started) {
log.warn("Failed to get owned service units, load manager is not started.");
- return Collections.emptySet();
+ return CompletableFuture.completedFuture(Collections.emptySet());
}
- Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
+
String brokerId = brokerRegistry.getBrokerId();
+ Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
@@ -214,34 +215,26 @@
}).collect(Collectors.toSet());
// Add heartbeat and SLA monitor namespace bundle.
NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
- try {
- NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
- .getFullBundle(heartbeatNamespace);
- ownedServiceUnits.add(fullBundle);
- } catch (Exception e) {
- log.warn("Failed to get heartbeat namespace bundle.", e);
- }
NamespaceName heartbeatNamespaceV2 = NamespaceService
.getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
- try {
- NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
- .getFullBundle(heartbeatNamespaceV2);
- ownedServiceUnits.add(fullBundle);
- } catch (Exception e) {
- log.warn("Failed to get heartbeat namespace V2 bundle.", e);
- }
-
NamespaceName slaMonitorNamespace = NamespaceService
.getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
- try {
- NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
- .getFullBundle(slaMonitorNamespace);
- ownedServiceUnits.add(fullBundle);
- } catch (Exception e) {
- log.warn("Failed to get SLA Monitor namespace bundle.", e);
- }
-
- return ownedServiceUnits;
+ return pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundleAsync(heartbeatNamespace)
+ .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
+ log.warn("Failed to get heartbeat namespace bundle.", e);
+ return null;
+ }).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundleAsync(heartbeatNamespaceV2))
+ .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
+ log.warn("Failed to get heartbeat namespace V2 bundle.", e);
+ return null;
+ }).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundleAsync(slaMonitorNamespace))
+ .thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
+ log.warn("Failed to get SLA Monitor namespace bundle.", e);
+ return null;
+ }).thenApply(__ -> ownedServiceUnits);
}
public enum Role {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index e04be25..33066d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -806,12 +806,12 @@
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
- var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
- .collect(Collectors.toMap(NamespaceBundle::toString,
- bundle -> getNamespaceOwnershipStatus(true,
- namespaceIsolationPolicies.getPolicyByNamespace(
- bundle.getNamespaceObject()))));
- return CompletableFuture.completedFuture(statusMap);
+ return extensibleLoadManager.getOwnedServiceUnitsAsync()
+ .thenApply(OwnedServiceUnits -> OwnedServiceUnits.stream()
+ .collect(Collectors.toMap(NamespaceBundle::toString,
+ bundle -> getNamespaceOwnershipStatus(true,
+ namespaceIsolationPolicies.getPolicyByNamespace(
+ bundle.getNamespaceObject())))));
}
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
@@ -1128,7 +1128,12 @@
public Set<NamespaceBundle> getOwnedServiceUnits() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
- return extensibleLoadManager.getOwnedServiceUnits();
+ try {
+ return extensibleLoadManager.getOwnedServiceUnitsAsync()
+ .get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
.collect(Collectors.toSet());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 04d793c..2feaabe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1144,13 +1144,15 @@
.getFullBundle(slaMonitorNamespacePulsar2);
- Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
+ Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnitsAsync()
+ .get(5, TimeUnit.SECONDS);
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
// heartbeat namespace bundle will own by pulsar1
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1));
- Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
+ Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnitsAsync()
+ .get(5, TimeUnit.SECONDS);
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
@@ -1186,7 +1188,8 @@
ExtensibleLoadManagerImpl extensibleLoadManager,
NamespaceBundle bundle) throws PulsarAdminException {
Awaitility.await().untilAsserted(() -> {
- Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnits();
+ Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnitsAsync()
+ .get(5, TimeUnit.SECONDS);
assertTrue(ownedBundles.contains(bundle));
});
Map<String, NamespaceOwnershipStatus> ownedNamespaces =
@@ -1199,9 +1202,11 @@
}
@Test(timeOut = 30 * 1000)
- public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() {
+ public void testGetOwnedServiceUnitsWhenLoadManagerNotStart()
+ throws Exception {
ExtensibleLoadManagerImpl loadManager = new ExtensibleLoadManagerImpl();
- Set<NamespaceBundle> ownedServiceUnits = loadManager.getOwnedServiceUnits();
+ Set<NamespaceBundle> ownedServiceUnits = loadManager.getOwnedServiceUnitsAsync()
+ .get(5, TimeUnit.SECONDS);
assertNotNull(ownedServiceUnits);
assertTrue(ownedServiceUnits.isEmpty());
}
@@ -1216,7 +1221,7 @@
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
.contains(namespaceEphemeralData.getNativeUrl()));
- admin.namespaces().deleteNamespace(namespace, true);
+ admin.namespaces().deleteNamespace(namespace);
}
@Test(timeOut = 30 * 1000)