blob: 5fbda961c0e3d5e39d800e8e460126e9e37d7c00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import lombok.Cleanup;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class AntiAffinityNamespaceGroupTest extends MockedPulsarServiceBaseTest {
private PulsarTestContext additionalPulsarTestContext;
private PulsarService pulsar1;
private PulsarAdmin admin1;
private PulsarService pulsar2;
protected String primaryHost;
protected String secondaryHost;
private NamespaceBundleFactory nsFactory;
protected Object primaryLoadManager;
private Object secondaryLoadManager;
private PulsarResources resources;
private static Object getField(final Object instance, final String fieldName) throws Exception {
final Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(instance);
}
void setupConfigs(ServiceConfiguration conf){
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(getLoadManagerClassName());
conf.setFailureDomainsEnabled(true);
conf.setLoadBalancerEnabled(true);
// Don't want overloaded threshold to affect namespace placement
conf.setLoadBalancerBrokerOverloadedThresholdPercentage(400);
}
@BeforeClass
@Override
public void setup() throws Exception {
setupConfigs(conf);
super.internalSetup(conf);
pulsar1 = pulsar;
primaryHost = pulsar1.getBrokerId();
admin1 = admin;
var config2 = getDefaultConf();
setupConfigs(config2);
additionalPulsarTestContext = createAdditionalPulsarTestContext(config2);
pulsar2 = additionalPulsarTestContext.getPulsarService();
secondaryHost = pulsar2.getBrokerId();
primaryLoadManager = getField(pulsar1.getLoadManager().get(), "loadManager");
secondaryLoadManager = getField(pulsar2.getLoadManager().get(), "loadManager");
nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
Awaitility.await().untilAsserted(() -> {
assertEquals(pulsar1.getState(), PulsarService.State.Started);
assertEquals(pulsar2.getState(), PulsarService.State.Started);
});
admin1.tenants().createTenant("my-tenant",
createDefaultTenantInfo());
}
@Override
@AfterClass
protected void cleanup() throws Exception {
pulsar1 = null;
pulsar2.close();
super.internalCleanup();
this.additionalPulsarTestContext.close();
}
protected void beforePulsarStart(PulsarService pulsar) throws Exception {
if (resources == null) {
MetadataStoreExtended localStore = pulsar.createLocalMetadataStore(null);
MetadataStoreExtended configStore = (MetadataStoreExtended) pulsar.createConfigurationMetadataStore(null);
resources = new PulsarResources(localStore, configStore);
}
this.createNamespaceIfNotExists(resources, NamespaceName.SYSTEM_NAMESPACE.getTenant(),
NamespaceName.SYSTEM_NAMESPACE);
}
protected void createNamespaceIfNotExists(PulsarResources resources,
String publicTenant,
NamespaceName ns) throws Exception {
TenantResources tr = resources.getTenantResources();
NamespaceResources nsr = resources.getNamespaceResources();
if (!tr.tenantExists(publicTenant)) {
tr.createTenant(publicTenant,
TenantInfo.builder()
.adminRoles(Sets.newHashSet(conf.getSuperUserRoles()))
.allowedClusters(Sets.newHashSet(conf.getClusterName()))
.build());
}
if (!nsr.namespaceExists(ns)) {
Policies nsp = new Policies();
nsp.replication_clusters = Collections.singleton(conf.getClusterName());
nsr.createPolicies(ns, nsp);
}
}
protected Object getBundleOwnershipData(){
return ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
}
protected String getLoadManagerClassName() {
return ModularLoadManagerImpl.class.getName();
}
@Test
public void testClusterDomain() {
}
/**
*
* It verifies anti-affinity-namespace assignment with failure-domain
*
* <pre>
* Domain Brokers-count
* ________ ____________
* domain-0 broker-0,broker-1
* domain-1 broker-2,broker-3
*
* Anti-affinity-namespace assignment
*
* (1) ns0 -> candidate-brokers: b0, b1, b2, b3 => selected b0
* (2) ns1 -> candidate-brokers: b2, b3 => selected b2
* (3) ns2 -> candidate-brokers: b1, b3 => selected b1
* (4) ns3 -> candidate-brokers: b3 => selected b3
* (5) ns4 -> candidate-brokers: b0, b1, b2, b3 => selected b0
*
* "candidate" broker to own anti-affinity-namespace = b2 or b4
*
* </pre>
*
*/
@Test
public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
final String namespace = "my-tenant/test/my-ns";
final int totalNamespaces = 5;
final String namespaceAntiAffinityGroup = "my-antiaffinity";
final String bundle = "/0x00000000_0xffffffff";
final int totalBrokers = 4;
pulsar1.getConfiguration().setFailureDomainsEnabled(true);
for (int i = 0; i < totalNamespaces; i++) {
final String ns = namespace + i;
admin1.namespaces().createNamespace(ns);
admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
}
Set<String> brokers = new HashSet<>();
Map<String, String> brokerToDomainMap = new HashMap<>();
brokers.add("brokerName-0");
brokerToDomainMap.put("brokerName-0", "domain-0");
brokers.add("brokerName-1");
brokerToDomainMap.put("brokerName-1", "domain-0");
brokers.add("brokerName-2");
brokerToDomainMap.put("brokerName-2", "domain-1");
brokers.add("brokerName-3");
brokerToDomainMap.put("brokerName-3", "domain-1");
Set<String> candidate = new HashSet<>();
Object brokerToNamespaceToBundleRange = getBundleOwnershipData();
assertEquals(brokers.size(), totalBrokers);
String assignedNamespace = namespace + "0" + bundle;
candidate.addAll(brokers);
// for namespace-0 all brokers available
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
brokerToNamespaceToBundleRange, brokerToDomainMap);
assertEquals(brokers.size(), totalBrokers);
// add namespace-0 to broker-0 of domain-0 => state: n0->b0
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-0", namespace + "0", assignedNamespace);
candidate.addAll(brokers);
// for namespace-1 only domain-1 brokers are available as broker-0 already owns namespace-0
assignedNamespace = namespace + "1" + bundle;
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, brokerToDomainMap);
assertEquals(candidate.size(), 2);
candidate.forEach(broker -> assertEquals(brokerToDomainMap.get(broker), "domain-1"));
// add namespace-1 to broker-2 of domain-1 => state: n0->b0, n1->b2
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-2", namespace + "1", assignedNamespace);
candidate.addAll(brokers);
// for namespace-2 only brokers available are : broker-1 and broker-3
assignedNamespace = namespace + "2" + bundle;
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, brokerToDomainMap);
assertEquals(candidate.size(), 2);
assertTrue(candidate.contains("brokerName-1"));
assertTrue(candidate.contains("brokerName-3"));
// add namespace-2 to broker-1 of domain-0 => state: n0->b0, n1->b2, n2->b1
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-1", namespace + "2", assignedNamespace);
candidate.addAll(brokers);
// for namespace-3 only brokers available are : broker-3
assignedNamespace = namespace + "3" + bundle;
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, brokerToDomainMap);
assertEquals(candidate.size(), 1);
assertTrue(candidate.contains("brokerName-3"));
// add namespace-3 to broker-3 of domain-1 => state: n0->b0, n1->b2, n2->b1, n3->b3
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-3", namespace + "3", assignedNamespace);
candidate.addAll(brokers);
// for namespace-4 only brokers available are : all 4 brokers
assignedNamespace = namespace + "4" + bundle;
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, brokerToDomainMap);
assertEquals(candidate.size(), 4);
}
/**
* It verifies anti-affinity-namespace assignment without failure-domain enabled
*
* <pre>
* Anti-affinity-namespace assignment
*
* (1) ns0 -> candidate-brokers: b0, b1, b2 => selected b0
* (2) ns1 -> candidate-brokers: b1, b2 => selected b1
* (3) ns2 -> candidate-brokers: b2 => selected b2
* (5) ns3 -> candidate-brokers: b0, b1, b2 => selected b0
* </pre>
*
*
* @throws Exception
*/
@Test
public void testAntiAffinityNamespaceFilteringWithoutDomain() throws Exception {
final String namespace = "my-tenant/test/my-ns-wo-domain";
final int totalNamespaces = 5;
final String namespaceAntiAffinityGroup = "my-antiaffinity-wo-domain";
final String bundle = "/0x00000000_0xffffffff";
for (int i = 0; i < totalNamespaces; i++) {
final String ns = namespace + i;
admin1.namespaces().createNamespace(ns);
admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
}
Set<String> brokers = new HashSet<>();
Set<String> candidate = new HashSet<>();
Object brokerToNamespaceToBundleRange = getBundleOwnershipData();
brokers.add("broker-0");
brokers.add("broker-1");
brokers.add("broker-2");
String assignedNamespace = namespace + "0" + bundle;
// all brokers available so, candidate will be all 3 brokers
candidate.addAll(brokers);
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
brokerToNamespaceToBundleRange, null);
assertEquals(brokers.size(), 3);
// add ns-0 to broker-0
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
candidate.addAll(brokers);
assignedNamespace = namespace + "1" + bundle;
// available brokers for ns-1 => broker-1, broker-2
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, null);
assertEquals(candidate.size(), 2);
assertTrue(candidate.contains("broker-1"));
assertTrue(candidate.contains("broker-2"));
// add ns-1 to broker-1
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
candidate.addAll(brokers);
// available brokers for ns-2 => broker-2
assignedNamespace = namespace + "2" + bundle;
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, null);
assertEquals(candidate.size(), 1);
assertTrue(candidate.contains("broker-2"));
// add ns-2 to broker-2
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
candidate.addAll(brokers);
// available brokers for ns-3 => broker-0, broker-1, broker-2
assignedNamespace = namespace + "3" + bundle;
filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
brokerToNamespaceToBundleRange, null);
assertEquals(candidate.size(), 3);
}
protected void selectBrokerForNamespace(
Object ownershipData,
String broker, String namespace, String assignedBundleName) {
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>
brokerToNamespaceToBundleRange =
(ConcurrentOpenHashMap<String,
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>) ownershipData;
ConcurrentOpenHashSet<String> bundleSet =
ConcurrentOpenHashSet.<String>newBuilder().build();
bundleSet.add(assignedBundleName);
ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap =
ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<String>>newBuilder().build();
nsToBundleMap.put(namespace, bundleSet);
brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
}
/**
* It verifies anti-affinity with failure domain enabled with 2 brokers.
*
* Note: in this class's setup method, the LoadBalancerBrokerOverloadedThresholdPercentage
* is set to 400 to ensure that the overloaded logic doesn't affect the broker selection.
* Without that configuration, two namespaces in the same anti-affinity group could
* be placed on the same broker. The CPU usage can be over 100%, and if we run with more
* than 4 cores, it could conceivably be above 400%. If this test becomes flaky again,
* look at the logs to see if there is a mention of an overloaded broker.
*
* <pre>
* 1. Register brokers to domain: domain-1: broker1 & domain-2: broker2
* 2. Load-Manager receives a watch and updates brokerToDomain cache with new domain data
* 3. Create two namespace with anti-affinity
* 4. Load-manager selects broker for each namespace such that from different domains
*
* </pre>
*
* @throws Exception
*/
@Test
public void testBrokerSelectionForAntiAffinityGroup() throws Exception {
final String broker1 = primaryHost;
final String broker2 = secondaryHost;
final String cluster = pulsar1.getConfiguration().getClusterName();
final String tenant = "tenant-" + UUID.randomUUID();
final String namespace1 = tenant + "/" + cluster + "/ns1";
final String namespace2 = tenant + "/" + cluster + "/ns2";
final String namespaceAntiAffinityGroup = "group";
FailureDomain domain1 = FailureDomain.builder()
.brokers(Collections.singleton(broker1))
.build();
admin1.clusters().createFailureDomain(cluster, "domain1", domain1);
FailureDomain domain2 = FailureDomain.builder()
.brokers(Collections.singleton(broker2))
.build();
admin1.clusters().createFailureDomain(cluster, "domain2", domain2);
admin1.tenants().createTenant(tenant, new TenantInfoImpl(null, Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(namespace1);
admin1.namespaces().createNamespace(namespace2);
admin1.namespaces().setNamespaceAntiAffinityGroup(namespace1, namespaceAntiAffinityGroup);
admin1.namespaces().setNamespaceAntiAffinityGroup(namespace2, namespaceAntiAffinityGroup);
// validate strategically if brokerToDomainCache updated
Awaitility.await().untilAsserted(() -> {
assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager));
assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
});
ServiceUnitId serviceUnit1 = makeBundle(tenant, cluster, "ns1");
String selectedBroker1 = selectBroker(serviceUnit1, primaryLoadManager);
ServiceUnitId serviceUnit2 = makeBundle(tenant, cluster, "ns2");
String selectedBroker2 = selectBroker(serviceUnit2, primaryLoadManager);
assertNotEquals(selectedBroker1, selectedBroker2);
}
protected String selectBroker(ServiceUnitId serviceUnit, Object loadManager) {
return ((ModularLoadManager) loadManager).selectBrokerForAssignment(serviceUnit).get();
}
/**
* It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't
* cause uneven anti-affinity namespace distribution.
*
* <pre>
* 1. broker1 owns ns-0 => broker1 can unload ns-0
* 1. broker2 owns ns-1 => broker1 can unload ns-0
* 1. broker3 owns ns-2 => broker1 can't unload ns-0 as all brokers have same no NS
* </pre>
*
* @throws Exception
*/
@Test
public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception {
final String namespace = "my-tenant/test/my-ns-load-shedding-util";
final int totalNamespaces = 5;
final String namespaceAntiAffinityGroup = "my-antiaffinity-load-shedding-util";
final String bundle = "/0x00000000_0xffffffff";
for (int i = 0; i < totalNamespaces; i++) {
final String ns = namespace + i;
admin1.namespaces().createNamespace(ns);
admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
}
Set<String> brokers = new HashSet<>();
Set<String> candidate = new HashSet<>();
Object brokerToNamespaceToBundleRange = getBundleOwnershipData();
brokers.add("broker-0");
brokers.add("broker-1");
brokers.add("broker-2");
String assignedNamespace = namespace + "0" + bundle;
// all brokers available so, candidate will be all 3 brokers
candidate.addAll(brokers);
// add ns-0 to broker-0
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
String currentBroker = "broker-0";
boolean shouldUnload = shouldAntiAffinityNamespaceUnload(namespace + "0", bundle,
currentBroker, pulsar1, brokerToNamespaceToBundleRange, candidate);
assertTrue(shouldUnload);
// add ns-1 to broker-1
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
shouldUnload = shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
pulsar1, brokerToNamespaceToBundleRange, candidate);
assertTrue(shouldUnload);
// add ns-2 to broker-2
selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
shouldUnload = shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
pulsar1, brokerToNamespaceToBundleRange, candidate);
assertFalse(shouldUnload);
}
/**
* It verifies that load-manager::shouldAntiAffinityNamespaceUnload checks that unloading should only happen if all
* brokers have same number of anti-affinity namespaces
*
* @throws Exception
*/
@Test
public void testLoadSheddingWithAntiAffinityNamespace() throws Exception {
final String namespace = "my-tenant/test/my-ns-load-shedding";
final int totalNamespaces = 5;
final String namespaceAntiAffinityGroup = "my-antiaffinity-load-shedding";
final String bundle = "0x00000000_0xffffffff";
for (int i = 0; i < totalNamespaces; i++) {
final String ns = namespace + i;
admin1.namespaces().createNamespace(ns);
admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
}
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "0/my-topic1")
.create();
pulsar1.getBrokerService().updateRates();
verifyLoadSheddingWithAntiAffinityNamespace(namespace + "0", bundle);
producer.close();
}
protected void verifyLoadSheddingWithAntiAffinityNamespace(String namespace, String bundle) {
ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
.getLoadManager().get()).getLoadManager();
loadManager.updateAll();
assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace, bundle, primaryHost));
}
protected boolean isLoadManagerUpdatedDomainCache(Object loadManager) throws Exception {
@SuppressWarnings("unchecked")
var brokerToFailureDomainMap = (Map<String, String>)
FieldUtils.readDeclaredField(loadManager, "brokerToFailureDomainMap", true);
return !brokerToFailureDomainMap.isEmpty();
}
private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) {
return nsFactory.getBundle(NamespaceName.get(property, cluster, namespace),
Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
BoundType.CLOSED));
}
private static void filterAntiAffinityGroupOwnedBrokers(
PulsarService pulsar,
String assignedNamespace,
Set<String> brokers,
Object ownershipData,
Map<String, String> brokerToDomainMap) {
if (ownershipData instanceof Set) {
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, assignedNamespace, brokers,
(Set<Map.Entry<String, ServiceUnitStateData>>) ownershipData, brokerToDomainMap);
} else if (ownershipData instanceof ConcurrentOpenHashMap) {
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, assignedNamespace, brokers,
(ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>)
ownershipData, brokerToDomainMap);
} else {
throw new RuntimeException("Unknown ownershipData class type");
}
}
private static boolean shouldAntiAffinityNamespaceUnload(
String namespace,
String bundle,
String currentBroker,
PulsarService pulsar,
Object ownershipData,
Set<String> candidate) throws Exception {
if (ownershipData instanceof Set) {
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
currentBroker, pulsar, (Set<Map.Entry<String, ServiceUnitStateData>>) ownershipData, candidate);
} else if (ownershipData instanceof ConcurrentOpenHashMap) {
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle,
currentBroker, pulsar,
(ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>)
ownershipData, candidate);
} else {
throw new RuntimeException("Unknown ownershipData class type");
}
}
}