blob: 5937af68ec7f21cb2c2cc514d8ae47b76776f296 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;
import static java.lang.Thread.sleep;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
public class ModularLoadManagerImplTest {
private LocalBookkeeperEnsemble bkEnsemble;
private URL url1;
private PulsarService pulsar1;
private PulsarAdmin admin1;
private URL url2;
private PulsarService pulsar2;
private PulsarAdmin admin2;
private PulsarService pulsar3;
private String primaryHost;
private String secondaryHost;
private NamespaceBundleFactory nsFactory;
private ModularLoadManagerImpl primaryLoadManager;
private ModularLoadManagerImpl secondaryLoadManager;
private ExecutorService executor;
// Invoke non-overloaded method.
private Object invokeSimpleMethod(final Object instance, final String methodName, final Object... args)
throws Exception {
for (Method method : instance.getClass().getDeclaredMethods()) {
if (method.getName().equals(methodName)) {
method.setAccessible(true);
return method.invoke(instance, args);
}
}
throw new IllegalArgumentException("Method not found: " + methodName);
}
private static Object getField(final Object instance, final String fieldName) throws Exception {
final Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(instance);
}
private static void setField(final Object instance, final String fieldName, final Object value) throws Exception {
final Field field = instance.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(instance, value);
}
@BeforeMethod
void setup() throws Exception {
executor = new ThreadPoolExecutor(1, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
// Start broker 1
ServiceConfiguration config1 = new ServiceConfiguration();
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config1.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config1.setAdvertisedAddress("localhost");
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
pulsar1 = new PulsarService(config1);
pulsar1.start();
primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
url1 = new URL(pulsar1.getWebServiceAddress());
admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
// Start broker 2
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config2.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config2.setAdvertisedAddress("localhost");
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
pulsar2 = new PulsarService(config2);
pulsar2.start();
ServiceConfiguration config = new ServiceConfiguration();
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setAdvertisedAddress("localhost");
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
pulsar3 = new PulsarService(config);
secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
url2 = new URL(pulsar2.getWebServiceAddress());
admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
sleep(100);
}
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdownNow();
admin1.close();
admin2.close();
pulsar2.close();
pulsar1.close();
if (pulsar3.isRunning()) {
pulsar3.close();
}
bkEnsemble.stop();
}
private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) {
return nsFactory.getBundle(NamespaceName.get(property, cluster, namespace),
Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
BoundType.CLOSED));
}
private NamespaceBundle makeBundle(final String all) {
return makeBundle(all, all, all);
}
private String mockBundleName(final int i) {
return String.format("%d/%d/%d/0x00000000_0xffffffff", i, i, i);
}
// Test disabled since it's depending on CPU usage in the machine
@Test(enabled = false)
public void testCandidateConsistency() throws Exception {
boolean foundFirst = false;
boolean foundSecond = false;
// After 2 selections, the load balancer should select both brokers due to preallocation.
for (int i = 0; i < 2; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
final String broker = primaryLoadManager.selectBrokerForAssignment(serviceUnit).get();
if (broker.equals(primaryHost)) {
foundFirst = true;
} else {
foundSecond = true;
}
}
assertTrue(foundFirst);
assertTrue(foundSecond);
// Now disable the secondary broker.
secondaryLoadManager.disableBroker();
LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
// Make sure the second broker is not in the internal map.
Awaitility.await().untilAsserted(() -> assertFalse(loadData.getBrokerData().containsKey(secondaryHost)));
// Try 5 more selections, ensure they all go to the first broker.
for (int i = 2; i < 7; ++i) {
final ServiceUnitId serviceUnit = makeBundle(Integer.toString(i));
assertEquals(primaryLoadManager.selectBrokerForAssignment(serviceUnit), primaryHost);
}
}
// Test that bundles belonging to the same namespace are distributed evenly among brokers.
// Test disabled since it's depending on CPU usage in the machine
@Test(enabled = false)
public void testEvenBundleDistribution() throws Exception {
final NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(nsFactory, "test", "test", "test", 16);
int numAssignedToPrimary = 0;
int numAssignedToSecondary = 0;
final BundleData bundleData = new BundleData(10, 1000);
final TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000);
longTermMessageData.setMsgRateIn(1000);
bundleData.setLongTermData(longTermMessageData);
final String firstBundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[0]);
// Write long message rate for first bundle to ensure that even bundle distribution is not a coincidence of
// balancing by message rate. If we were balancing by message rate, one of the brokers should only have this
// one bundle.
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath, bundleData).join();
for (final NamespaceBundle bundle : bundles) {
if (primaryLoadManager.selectBrokerForAssignment(bundle).equals(primaryHost)) {
++numAssignedToPrimary;
} else {
++numAssignedToSecondary;
}
if ((numAssignedToPrimary + numAssignedToSecondary) % 2 == 0) {
// On even number of assignments, assert that an equal number of bundles have been assigned between
// them.
assertEquals(numAssignedToPrimary, numAssignedToSecondary);
}
}
}
@Test
public void testBrokerAffinity() throws Exception {
// Start broker 3
pulsar3.start();
final String tenant = "test";
final String cluster = "test";
String namespace = tenant + "/" + cluster + "/" + "test";
String topic = "persistent://" + namespace + "/my-topic1";
admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(namespace, 16);
String topicLookup = admin1.lookups().lookupTopic(topic);
String bundleRange = admin1.lookups().getBundleRange(topic);
String brokerServiceUrl = pulsar1.getBrokerServiceUrl();
String brokerUrl = pulsar1.getSafeWebServiceAddress();
log.debug("initial broker service url - {}", topicLookup);
Random rand=new Random();
if (topicLookup.equals(brokerServiceUrl)) {
int x = rand.nextInt(2);
if (x == 0) {
brokerUrl = pulsar2.getSafeWebServiceAddress();
brokerServiceUrl = pulsar2.getBrokerServiceUrl();
}
else {
brokerUrl = pulsar3.getSafeWebServiceAddress();
brokerServiceUrl = pulsar3.getBrokerServiceUrl();
}
}
brokerUrl = brokerUrl.replaceFirst("http[s]?://", "");
log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl);
String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl();
log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress());
//Make a call to broker which is not a leader
if (!leaderServiceUrl.equals(pulsar1.getSafeWebServiceAddress())) {
admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
}
else {
admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
}
sleep(2000);
String topicLookupAfterUnload = admin1.lookups().lookupTopic(topic);
log.debug("final broker service url - {}", topicLookupAfterUnload);
Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload);
}
/**
* It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker
* unless all the brokers are in same state.
*
* <pre>
* 1. Create a bundle whose bundle-resource-quota will contain max-topics
* 2. Load-manager assigns broker to this bundle so, assigned broker is overloaded with max-topics
* 3. For any new further bundles: broker assigns different brokers.
* </pre>
*
* @throws Exception
*/
@Test
public void testMaxTopicDistributionToBroker() throws Exception {
final int totalBundles = 50;
final NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(nsFactory, "test", "test", "test",
totalBundles);
final BundleData bundleData = new BundleData(10, 1000);
// it sets max topics under this bundle so, owner of this broker reaches max-topic threshold
bundleData.setTopics(pulsar1.getConfiguration().getLoadBalancerBrokerMaxTopics() + 10);
final TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000);
longTermMessageData.setMsgRateIn(1000);
bundleData.setLongTermData(longTermMessageData);
final String firstBundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[0]);
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath, bundleData).join();
String maxTopicOwnedBroker = primaryLoadManager.selectBrokerForAssignment(bundles[0]).get();
for (int i = 1; i < totalBundles; i++) {
assertNotEquals(primaryLoadManager.selectBrokerForAssignment(bundles[i]), maxTopicOwnedBroker);
}
}
// Test that load shedding works
@Test
public void testLoadShedding() throws Exception {
final NamespaceBundleStats stats1 = new NamespaceBundleStats();
final NamespaceBundleStats stats2 = new NamespaceBundleStats();
stats1.msgRateIn = 100;
stats2.msgRateIn = 200;
final Map<String, NamespaceBundleStats> statsMap = new ConcurrentHashMap<>();
statsMap.put(mockBundleName(1), stats1);
statsMap.put(mockBundleName(2), stats2);
final LocalBrokerData localBrokerData = new LocalBrokerData();
localBrokerData.update(new SystemResourceUsage(), statsMap);
final Namespaces namespacesSpy1 = spy(pulsar1.getAdminClient().namespaces());
AtomicReference<String> bundleReference = new AtomicReference<>();
doAnswer(invocation -> {
bundleReference.set(invocation.getArguments()[0].toString() + '/' + invocation.getArguments()[1]);
return null;
}).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
AtomicReference<Optional<String>> selectedBrokerRef = new AtomicReference<>();
ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
doAnswer(invocation -> {
ServiceUnitId serviceUnitId = (ServiceUnitId) invocation.getArguments()[0];
Optional<String> broker = primaryLoadManager.selectBroker(serviceUnitId);
selectedBrokerRef.set(broker);
return broker;
}).when(primaryLoadManagerSpy).selectBroker(any());
setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
pulsar1.getConfiguration().setLoadBalancerEnabled(true);
final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData");
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));
primaryLoadManagerSpy.doLoadShedding();
// 80% is below overload threshold: verify nothing is unloaded.
verify(namespacesSpy1, Mockito.times(0))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
localBrokerData.setCpu(new ResourceUsage(90, 100));
primaryLoadManagerSpy.doLoadShedding();
// Most expensive bundle will be unloaded.
verify(namespacesSpy1, Mockito.times(1))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(2));
assertEquals(selectedBrokerRef.get().get(), secondaryHost);
primaryLoadManagerSpy.doLoadShedding();
// Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be
// unloaded, but this is not the case due to the spy's behavior).
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(1));
assertEquals(selectedBrokerRef.get().get(), secondaryHost);
primaryLoadManagerSpy.doLoadShedding();
// Now both are in grace period: neither should be unloaded.
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(selectedBrokerRef.get().get(), secondaryHost);
// Test bundle transfer to same broker
loadData.getRecentlyUnloadedBundles().clear();
primaryLoadManagerSpy.doLoadShedding();
verify(namespacesSpy1, Mockito.times(3))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any());
loadData.getRecentlyUnloadedBundles().clear();
primaryLoadManagerSpy.doLoadShedding();
// The bundle shouldn't be unloaded because the broker is the same.
verify(namespacesSpy1, Mockito.times(3))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
}
// Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain
// metrics change by a percentage threshold.
@Test
public void testNeedBrokerDataUpdate() throws Exception {
final LocalBrokerData lastData = new LocalBrokerData();
final LocalBrokerData currentData = new LocalBrokerData();
final ServiceConfiguration conf = pulsar1.getConfiguration();
// Set this manually in case the default changes.
conf.setLoadBalancerReportUpdateThresholdPercentage(5);
// Easier to test using an uninitialized ModularLoadManagerImpl.
final ModularLoadManagerImpl loadManager = new ModularLoadManagerImpl();
setField(loadManager, "lastData", lastData);
setField(loadManager, "localData", currentData);
setField(loadManager, "conf", conf);
Supplier<Boolean> needUpdate = () -> {
try {
return (Boolean) invokeSimpleMethod(loadManager, "needBrokerDataUpdate");
} catch (Exception e) {
throw new RuntimeException(e);
}
};
lastData.setMsgRateIn(100);
currentData.setMsgRateIn(104);
// 4% difference: shouldn't trigger an update.
assert (!needUpdate.get());
currentData.setMsgRateIn(105.1);
// 5% difference: should trigger an update (exactly 5% is flaky due to precision).
assert (needUpdate.get());
// Do similar tests for lower values.
currentData.setMsgRateIn(94);
assert (needUpdate.get());
currentData.setMsgRateIn(95.1);
assert (!needUpdate.get());
// 0 to non-zero should always trigger an update.
lastData.setMsgRateIn(0);
currentData.setMsgRateIn(1e-8);
assert (needUpdate.get());
// non-zero to zero should trigger an update as long as the threshold is less than 100.
lastData.setMsgRateIn(1e-8);
currentData.setMsgRateIn(0);
assert (needUpdate.get());
// zero to zero should never trigger an update.
currentData.setMsgRateIn(0);
lastData.setMsgRateIn(0);
assert (!needUpdate.get());
// Minimally test other absolute values to ensure they are included.
lastData.setCpu(new ResourceUsage(100, 1000));
currentData.setCpu(new ResourceUsage(106, 1000));
assert (!needUpdate.get());
// Minimally test other absolute values to ensure they are included.
lastData.setCpu(new ResourceUsage(100, 1000));
currentData.setCpu(new ResourceUsage(206, 1000));
assert (needUpdate.get());
lastData.setCpu(new ResourceUsage());
currentData.setCpu(new ResourceUsage());
lastData.setMsgThroughputIn(100);
currentData.setMsgThroughputIn(106);
assert (needUpdate.get());
currentData.setMsgThroughputIn(100);
lastData.setNumBundles(100);
currentData.setNumBundles(106);
assert (needUpdate.get());
currentData.setNumBundles(100);
assert (!needUpdate.get());
}
/**
* It verifies that deletion of broker-znode on broker-stop will invalidate availableBrokerCache list
*/
@Test
public void testBrokerStopCacheUpdate() throws Exception {
ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
assertEquals(lm.getAvailableBrokers().size(), 2);
pulsar2.close();
Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1));
}
/**
* It verifies namespace-isolation policies with primary and secondary brokers.
*
* usecase:
*
* <pre>
* 1. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 1
* a. available-brokers: broker1, broker2, broker3 => result: broker1
* b. available-brokers: broker2, broker3 => result: broker2
* c. available-brokers: broker3 => result: NULL
* 2. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 2
* a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
* b. available-brokers: broker2, broker3 => result: broker2
* c. available-brokers: broker3 => result: NULL
* </pre>
*/
@Test
public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws Exception {
final String tenant = "my-property";
final String cluster = "use";
final String namespace = "my-ns";
final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
final String sharedBroker = "broker3";
admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + cluster + "/" + namespace);
// set a new policy
String newPolicyJsonTemplate = "{\"namespaces\":[\"%s/%s/%s.*\"],\"primary\":[\"%s\"],"
+ "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
String newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Address,
broker2Address, 1);
String newPolicyName = "my-ns-isolation-policies";
ObjectMapper jsonMapper = ObjectMapperFactory.create();
NamespaceIsolationDataImpl nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
NamespaceIsolationDataImpl.class);
admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);
SimpleResourceAllocationPolicies simpleResourceAllocationPolicies = new SimpleResourceAllocationPolicies(
pulsar1);
ServiceUnitId serviceUnit = LoadBalancerTestingUtils.makeBundles(nsFactory, tenant, cluster, namespace, 1)[0];
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
return true;
}
@Override
public boolean isEnableNonPersistentTopics(String brokerUrl) {
return true;
}
};
// (1) now we have isolation policy : primary=broker1, secondary=broker2, minLimit=1
// test1: shared=1, primary=1, secondary=1 => It should return 1 primary broker only
Set<String> brokerCandidateCache = new HashSet<>();
Set<String> availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker1Address));
// test2: shared=1, primary=0, secondary=1 => It should return 1 secondary broker only
brokerCandidateCache = new HashSet<>();
availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker2Address));
// test3: shared=1, primary=0, secondary=0 => It should return 0 broker
brokerCandidateCache = new HashSet<>();
availableBrokers = Sets.newHashSet(sharedBroker);
LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 0);
// (2) now we will have isolation policy : primary=broker1, secondary=broker2, minLimit=2
newPolicyJson = String.format(newPolicyJsonTemplate, tenant, cluster, namespace, broker1Address,
broker2Address, 2);
nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(), NamespaceIsolationDataImpl.class);
admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);
// test1: shared=1, primary=1, secondary=1 => It should return primary + secondary
brokerCandidateCache = new HashSet<>();
availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 2);
assertTrue(brokerCandidateCache.contains(broker1Address));
assertTrue(brokerCandidateCache.contains(broker2Address));
// test2: shared=1, secondary=1 => It should return secondary
brokerCandidateCache = new HashSet<>();
availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker2Address));
// test3: shared=1, => It should return 0 broker
brokerCandidateCache = new HashSet<>();
availableBrokers = Sets.newHashSet(sharedBroker);
LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 0);
}
@Test
public void testLoadSheddingWithNamespaceIsolationPolicies() throws Exception {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "my-tenant/use/my-ns";
final String bundle = "0x00000000_0xffffffff";
final String brokerAddress = pulsar1.getAdvertisedAddress();
final String broker1Address = pulsar1.getAdvertisedAddress() + 1;
admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(namespace);
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "/my-topic1")
.create();
ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
.getLoadManager().get()).getLoadManager();
pulsar1.getBrokerService().updateRates();
loadManager.updateAll();
// test1: no isolation policy
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));
// test2: as isolation policy, there are not another broker to load the bundle.
String newPolicyJsonTemplate = "{\"namespaces\":[\"%s.*\"],\"primary\":[\"%s\"],"
+ "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
String newPolicyJson = String.format(newPolicyJsonTemplate, namespace, broker1Address,broker1Address, 1);
String newPolicyName = "my-ns-isolation-policies";
ObjectMapper jsonMapper = ObjectMapperFactory.create();
NamespaceIsolationDataImpl nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
NamespaceIsolationDataImpl.class);
admin1.clusters().createNamespaceIsolationPolicy(cluster, newPolicyName, nsPolicyData);
assertFalse(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, broker1Address));
// test3: as isolation policy, there are another can load the bundle.
String newPolicyJson1 = String.format(newPolicyJsonTemplate, namespace, brokerAddress,brokerAddress, 1);
NamespaceIsolationDataImpl nsPolicyData1 = jsonMapper.readValue(newPolicyJson1.getBytes(),
NamespaceIsolationDataImpl.class);
admin1.clusters().updateNamespaceIsolationPolicy(cluster, newPolicyName, nsPolicyData1);
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace, bundle, primaryHost));
producer.close();
}
/**
* It verifies that pulsar-service fails if load-manager tries to create ephemeral znode for broker which is already
* created by other zk-session-id.
*
* @throws Exception
*/
@Test
public void testOwnBrokerZnodeByMultipleBroker() throws Exception {
ServiceConfiguration config = new ServiceConfiguration();
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
PulsarService pulsar = new PulsarService(config);
// create znode using different zk-session
final String brokerZnode = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getAdvertisedAddress() + ":"
+ config.getWebServicePort();
pulsar1.getLocalMetadataStore().put(brokerZnode, new byte[0], Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
try {
pulsar.start();
} catch (PulsarServerException e) {
//Ok.
}
pulsar.close();
}
@Test
public void testRemoveDeadBrokerTimeAverageData() throws Exception {
ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
assertEquals(lm.getAvailableBrokers().size(), 2);
pulsar2.close();
Awaitility.await().untilAsserted(() -> assertEquals(lm.getAvailableBrokers().size(), 1));
lm.updateAll();
List<String> data = pulsar1.getLocalMetadataStore()
.getMetadataCache(TimeAverageBrokerData.class)
.getChildren(BROKER_TIME_AVERAGE_BASE_PATH)
.join();
Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader()));
assertEquals(data.size(), 1);
}
@DataProvider(name = "isV1")
public Object[][] isV1() {
return new Object[][] {{true}, {false}};
}
@Test(dataProvider = "isV1")
public void testBundleDataDefaultValue(boolean isV1) throws Exception {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "my-ns";
NamespaceName ns = isV1 ? NamespaceName.get(tenant, cluster, namespace) : NamespaceName.get(tenant, namespace);
admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(ns.toString(), 16);
// set resourceQuota to the first bundle range.
BundlesData bundlesData = admin1.namespaces().getBundles(ns.toString());
NamespaceBundle namespaceBundle = nsFactory.getBundle(ns,
Range.range(Long.decode(bundlesData.getBoundaries().get(0)), BoundType.CLOSED, Long.decode(bundlesData.getBoundaries().get(1)),
BoundType.OPEN));
ResourceQuota quota = new ResourceQuota();
quota.setMsgRateIn(1024.1);
quota.setMsgRateOut(1024.2);
quota.setBandwidthIn(1024.3);
quota.setBandwidthOut(1024.4);
quota.setMemory(1024.0);
admin1.resourceQuotas().setNamespaceBundleResourceQuota(ns.toString(), namespaceBundle.getBundleRange(), quota);
ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
// get the bundleData of the first bundle range.
// The default value of the bundleData be the same as resourceQuota because the resourceQuota is present.
BundleData defaultBundleData = lm.getBundleDataOrDefault(namespaceBundle.toString());
TimeAverageMessageData shortTermData = defaultBundleData.getShortTermData();
TimeAverageMessageData longTermData = defaultBundleData.getLongTermData();
assertEquals(shortTermData.getMsgRateIn(), 1024.1);
assertEquals(shortTermData.getMsgRateOut(), 1024.2);
assertEquals(shortTermData.getMsgThroughputIn(), 1024.3);
assertEquals(shortTermData.getMsgThroughputOut(), 1024.4);
assertEquals(longTermData.getMsgRateIn(), 1024.1);
assertEquals(longTermData.getMsgRateOut(), 1024.2);
assertEquals(longTermData.getMsgThroughputIn(), 1024.3);
assertEquals(longTermData.getMsgThroughputOut(), 1024.4);
}
@Test
public void testRemoveNonExistBundleData()
throws PulsarAdminException, InterruptedException,
PulsarClientException, PulsarServerException, NoSuchFieldException, IllegalAccessException {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "remove-non-exist-bundle-data-ns";
final String topicName = tenant + "/" + namespace + "/" + "topic";
int bundleNumbers = 8;
admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();
// create a lot of topic to fully distributed among bundles.
for (int i = 0; i < 10; i++) {
String topicNameI = topicName + i;
admin1.topics().createPartitionedTopic(topicNameI, 20);
// trigger bundle assignment
pulsarClient.newConsumer().topic(topicNameI)
.subscriptionName("my-subscriber-name2").subscribe();
}
ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
ModularLoadManagerImpl lm1 = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();
ModularLoadManagerWrapper loadManager2 = (ModularLoadManagerWrapper) pulsar2.getLoadManager().get();
ModularLoadManagerImpl lm2 = (ModularLoadManagerImpl) loadManager2.getLoadManager();
Field executors = lm1.getClass().getDeclaredField("executors");
executors.setAccessible(true);
ExecutorService executorService = (ExecutorService) executors.get(lm1);
assertEquals(lm1.getAvailableBrokers().size(), 2);
pulsar1.getBrokerService().updateRates();
pulsar2.getBrokerService().updateRates();
lm1.writeBrokerDataOnZooKeeper(true);
lm2.writeBrokerDataOnZooKeeper(true);
// wait for metadata store notification finish
CountDownLatch latch = new CountDownLatch(1);
executorService.submit(latch::countDown);
latch.await();
loadManagerWrapper.writeResourceQuotasToZooKeeper();
MetadataCache<BundleData> bundlesCache = pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class);
// trigger bundle split
String topicToFindBundle = topicName + 0;
NamespaceBundle bundleWillBeSplit = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant + "/" + namespace;
CompletableFuture<List<String>> children = bundlesCache.getChildren(bundleDataPath);
List<String> bundles = children.join();
assertTrue(bundles.contains(bundleWillBeSplit.getBundleRange()));
// after updateAll no namespace bundle data is deleted from metadata store.
lm1.updateAll();
children = bundlesCache.getChildren(bundleDataPath);
bundles = children.join();
assertFalse(bundles.isEmpty());
assertEquals(bundleNumbers, bundles.size());
NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
pulsar1.getAdminClient().namespaces().splitNamespaceBundle(tenant + "/" + namespace,
bundleWillBeSplit.getBundleRange(),
false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
NamespaceBundles allBundlesAfterSplit =
pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
assertFalse(allBundlesAfterSplit.getBundles().contains(bundleWillBeSplit));
// the bundle data should be deleted
pulsar1.getBrokerService().updateRates();
pulsar2.getBrokerService().updateRates();
lm1.writeBrokerDataOnZooKeeper(true);
lm2.writeBrokerDataOnZooKeeper(true);
latch = new CountDownLatch(1);
// wait for metadata store notification finish
CountDownLatch finalLatch = latch;
executorService.submit(finalLatch::countDown);
latch.await();
loadManagerWrapper.writeResourceQuotasToZooKeeper();
lm1.updateAll();
log.info("update all triggered.");
// check bundle data should be deleted from metadata store.
CompletableFuture<List<String>> childrenAfterSplit = bundlesCache.getChildren(bundleDataPath);
List<String> bundlesAfterSplit = childrenAfterSplit.join();
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}
}