[improve][broker] Add broker filter sync method back to guarantee the API compatibility (#20826)
(cherry picked from commit 69d7a2bf14555f11a716a9545c5cf391d8179a27)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index f3c5e5d..cba499e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -493,7 +493,7 @@
new ArrayList<>(filterPipeline.size());
for (final BrokerFilter filter : filterPipeline) {
CompletableFuture<Map<String, BrokerLookupData>> future =
- filter.filter(availableBrokerCandidates, bundle, context);
+ filter.filterAsync(availableBrokerCandidates, bundle, context);
futures.add(future);
}
CompletableFuture<Optional<String>> result = new CompletableFuture<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
index df08ba9..37b35d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/AntiAffinityGroupPolicyFilter.java
@@ -39,7 +39,7 @@
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(
Map<String, BrokerLookupData> brokers, ServiceUnitId serviceUnitId, LoadManagerContext context) {
return helper.filterAsync(brokers, serviceUnitId.toString());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 046cae4..2950a01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -42,8 +43,23 @@
* @param context The load manager context.
* @return Filtered broker list.
*/
- CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context);
+ @Deprecated
+ default Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) throws BrokerFilterException {
+ return filterAsync(brokers, serviceUnit, context).join();
+ }
+
+ /**
+ * Filter out async unqualified brokers based on implementation.
+ *
+ * @param brokers The full broker and lookup data.
+ * @param serviceUnit The current serviceUnit.
+ * @param context The load manager context.
+ * @return Filtered broker list.
+ */
+ CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
index 0aa1dda..306c4c3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
@@ -44,9 +44,9 @@
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> availableBrokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> availableBrokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
return isolationPoliciesHelper.applyIsolationPoliciesAsync(availableBrokers, serviceUnit)
.thenApply(brokerCandidateCache -> {
availableBrokers.keySet().retainAll(brokerCandidateCache);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
index 6504da0..54d2a55 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilter.java
@@ -34,7 +34,7 @@
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(
Map<String, BrokerLookupData> brokers,
ServiceUnitId serviceUnit,
LoadManagerContext context) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index 044dcc8..472cabf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -36,9 +36,9 @@
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
int loadBalancerBrokerMaxTopics = context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Optional<BrokerLoadData> brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
index 6e8ce5e..1af39a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
@@ -47,9 +47,9 @@
*
*/
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
ServiceConfiguration conf = context.brokerConfiguration();
if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
return CompletableFuture.completedFuture(brokers);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7bb16ba..cd5c17f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -719,7 +719,7 @@
Map<String, BrokerLookupData> candidates = new HashMap<>(availableBrokers);
for (var filter : brokerFilterPipeline) {
try {
- filter.filter(candidates, namespaceBundle, context)
+ filter.filterAsync(candidates, namespaceBundle, context)
.get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
index 8a73a5f..2293ecd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/AntiAffinityNamespaceGroupExtensionTest.java
@@ -130,7 +130,7 @@
doReturn(namespace + "/" + bundle).when(namespaceBundle).toString();
var expected = new HashMap<>(brokers);
- var actual = antiAffinityGroupPolicyFilter.filter(
+ var actual = antiAffinityGroupPolicyFilter.filterAsync(
brokers, namespaceBundle, context).get();
assertEquals(actual, expected);
@@ -141,7 +141,7 @@
var srcBroker = serviceUnitStateChannel.getOwnerAsync(namespaceBundle.toString())
.get(5, TimeUnit.SECONDS).get();
expected.remove(srcBroker);
- actual = antiAffinityGroupPolicyFilter.filter(
+ actual = antiAffinityGroupPolicyFilter.filterAsync(
brokers, namespaceBundle, context).get();
assertEquals(actual, expected);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index cff92ab..410c10e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -267,9 +267,9 @@
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
brokers.remove(pulsar1.getLookupServiceAddress());
return CompletableFuture.completedFuture(brokers);
}
@@ -288,9 +288,9 @@
doReturn(List.of(new MockBrokerFilter() {
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
brokers.remove(brokers.keySet().iterator().next());
return FutureUtil.failedFuture(new BrokerFilterException("Test"));
}
@@ -513,17 +513,17 @@
String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
doReturn(List.of(new MockBrokerFilter() {
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
brokers.remove(lookupServiceAddress1);
return CompletableFuture.completedFuture(brokers);
}
},new MockBrokerFilter() {
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
return FutureUtil.failedFuture(new BrokerFilterException("Test"));
}
})).when(primaryLoadManager).getBrokerFilterPipeline();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
index b8d5d98..f45e140 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -83,20 +83,20 @@
BrokerIsolationPoliciesFilter filter = new BrokerIsolationPoliciesFilter(isolationPoliciesHelper);
// a. available-brokers: broker1, broker2, broker3 => result: broker1
- Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(Map.of(
+ Map<String, BrokerLookupData> result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName, getContext()).get();
assertEquals(result.keySet(), Set.of("broker1"));
// b. available-brokers: broker2, broker3 => result: broker2
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName, getContext()).get();
assertEquals(result.keySet(), Set.of("broker2"));
// c. available-brokers: broker3 => result: NULL
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker3", getLookupData())), namespaceName, getContext()).get();
assertTrue(result.isEmpty());
@@ -104,20 +104,20 @@
setIsolationPolicies(policies, namespaceName, Set.of("broker1"), Set.of("broker2"), Set.of("broker3"), 2);
// a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName, getContext()).get();
assertEquals(result.keySet(), Set.of("broker1", "broker2"));
// b. available-brokers: broker2, broker3 => result: broker2
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceName, getContext()).get();
assertEquals(result.keySet(), Set.of("broker2"));
// c. available-brokers: broker3 => result: NULL
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker3", getLookupData())), namespaceName, getContext()).get();
assertTrue(result.isEmpty());
}
@@ -141,14 +141,14 @@
- Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(Map.of(
+ Map<String, BrokerLookupData> result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceBundle, getContext()).get();
assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(true, false),
"broker2", getLookupData(true, false),
"broker3", getLookupData())), namespaceBundle, getContext()).get();
@@ -156,13 +156,13 @@
doReturn(false).when(namespaceBundle).hasNonPersistentTopic();
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceBundle, getContext()).get();
assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
- result = filter.filter(new HashMap<>(Map.of(
+ result = filter.filterAsync(new HashMap<>(Map.of(
"broker1", getLookupData(false, true),
"broker2", getLookupData(),
"broker3", getLookupData())), namespaceBundle, getContext()).get();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
index 6b35bf7..17475b4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerLoadManagerClassFilterTest.java
@@ -49,14 +49,14 @@
"broker5", getLookupData("3.0.0", null)
);
- Map<String, BrokerLookupData> result = filter.filter(new HashMap<>(originalBrokers), null, context).get();
+ Map<String, BrokerLookupData> result = filter.filterAsync(new HashMap<>(originalBrokers), null, context).get();
assertEquals(result, Map.of(
"broker1", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName()),
"broker2", getLookupData("3.0.0", ExtensibleLoadManagerImpl.class.getName())
));
context.brokerConfiguration().setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
- result = filter.filter(new HashMap<>(originalBrokers), null, context).get();
+ result = filter.filterAsync(new HashMap<>(originalBrokers), null, context).get();
assertEquals(result, Map.of(
"broker3", getLookupData("3.0.0", ModularLoadManagerImpl.class.getName()),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
index a621430..9d000cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
@@ -60,7 +60,7 @@
"broker4", getLookupData()
);
Map<String, BrokerLookupData> result =
- filter.filter(new HashMap<>(originalBrokers), null, context).get();
+ filter.filterAsync(new HashMap<>(originalBrokers), null, context).get();
assertEquals(result, Map.of(
"broker2", getLookupData(),
"broker4", getLookupData()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
index 4a1a454..2aa7fae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
@@ -43,7 +43,7 @@
@Test
public void testFilterEmptyBrokerList() throws BrokerFilterException, ExecutionException, InterruptedException {
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new HashMap<>(), null, getContext()).get();
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filterAsync(new HashMap<>(), null, getContext()).get();
assertTrue(result.isEmpty());
}
@@ -60,7 +60,7 @@
);
Map<String, BrokerLookupData> brokers = new HashMap<>(originalBrokers);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result = brokerVersionFilter.filter(brokers, null, context).get();
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filterAsync(brokers, null, context).get();
assertEquals(result, originalBrokers);
}
@@ -73,7 +73,7 @@
"localhost:6653", getLookupData("2.10.1")
);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result = brokerVersionFilter.filter(
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filterAsync(
new HashMap<>(originalBrokers), null, getContext()).get();
assertEquals(result, Map.of(
"localhost:6651", getLookupData("2.10.1"),
@@ -87,7 +87,7 @@
"localhost:6652", getLookupData("2.10.1"),
"localhost:6653", getLookupData("2.10.1")
);
- result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), null, getContext()).get();
+ result = brokerVersionFilter.filterAsync(new HashMap<>(originalBrokers), null, getContext()).get();
assertEquals(result, Map.of(
"localhost:6652", getLookupData("2.10.1"),
@@ -101,7 +101,7 @@
"localhost:6653", getLookupData("2.10.2-SNAPSHOT")
);
- result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), null, getContext()).get();
+ result = brokerVersionFilter.filterAsync(new HashMap<>(originalBrokers), null, getContext()).get();
assertEquals(result, Map.of(
"localhost:6653", getLookupData("2.10.2-SNAPSHOT")
));
@@ -115,7 +115,7 @@
);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
try {
- brokerVersionFilter.filter(new HashMap<>(originalBrokers), null, getContext()).get();
+ brokerVersionFilter.filterAsync(new HashMap<>(originalBrokers), null, getContext()).get();
fail();
} catch (Exception ex) {
assertEquals(ex.getCause().getClass(), BrokerFilterBadVersionException.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 3dd41a9..26d95a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -764,9 +764,9 @@
}
@Override
- public CompletableFuture<Map<String, BrokerLookupData>> filter(Map<String, BrokerLookupData> brokers,
- ServiceUnitId serviceUnit,
- LoadManagerContext context) {
+ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context) {
return FutureUtil.failedFuture(new BrokerFilterException("test"));
}
};