blob: 7a2314b01a3d10af412e4689cd0f762172ae997e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.awaitility.Awaitility;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* Start two brokers in the same cluster and have them connect to the same zookeeper. When the PulsarService starts, it
* will do the leader election and one of the brokers will become the leader. Then kill that broker and check if the
* second one becomes the leader.
*/
@Test(groups = "broker")
public class LoadBalancerTest {
LocalBookkeeperEnsemble bkEnsemble;
private static final Logger log = LoggerFactory.getLogger(LoadBalancerTest.class);
private static final int MAX_RETRIES = 15;
private static final int BROKER_COUNT = 5;
private final int[] brokerWebServicePorts = new int[BROKER_COUNT];
private final int[] brokerNativeBrokerPorts = new int[BROKER_COUNT];
private final URL[] brokerUrls = new URL[BROKER_COUNT];
private final String[] lookupAddresses = new String[BROKER_COUNT];
private final PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
private final PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
@BeforeMethod
void setup() throws Exception {
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
ZkUtils.createFullPathOptimistic(bkEnsemble.getZkClient(),
SimpleLoadManagerImpl.LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
"{\"loadBalancerStrategy\":\"leastLoadedServer\"}".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
final String localhost = "localhost";
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerServicePort(Optional.of(brokerNativeBrokerPorts[i]));
config.setClusterName("use");
config.setAdvertisedAddress(localhost);
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress(localhost+i);
config.setLoadBalancerEnabled(false);
pulsarServices[i] = new PulsarService(config);
pulsarServices[i].start();
brokerWebServicePorts[i] = pulsarServices[i].getListenPortHTTP().get();
brokerNativeBrokerPorts[i] = pulsarServices[i].getBrokerListenPort().get();
brokerUrls[i] = new URL("http://127.0.0.1" + ":" + brokerWebServicePorts[i]);
lookupAddresses[i] = pulsarServices[i].getBrokerId();
pulsarAdmins[i] = PulsarAdmin.builder().serviceHttpUrl(brokerUrls[i].toString()).build();
}
createNamespacePolicies(pulsarServices[0]);
Thread.sleep(100);
}
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
for (int i = 0; i < BROKER_COUNT; i++) {
pulsarAdmins[i].close();
pulsarServices[i].close();
}
bkEnsemble.stop();
}
private void loopUntilLeaderChangesForAllBroker(List<PulsarService> activePulsars, LeaderBroker oldLeader) {
Awaitility.await()
.pollInterval(1, TimeUnit.SECONDS)
.atMost(MAX_RETRIES, TimeUnit.SECONDS)
.until(() -> {
boolean settled = true;
// Check if the all active pulsar see a new leader.
for (PulsarService pulsar : activePulsars) {
Optional<LeaderBroker> leader = pulsar.getLeaderElectionService().readCurrentLeader().join();
// Check leader a pulsar see is not empty and not the old leader.
if (!leader.isPresent() || leader.get().equals(oldLeader)) {
settled = false;
break;
}
}
return settled;
});
}
/*
* tests that load manager creates its node and writes the initial load report a /loadbalance/brokers tests that
* those load reports can be deserialized and are in valid format tests if the rankings are populated from the load
* reports are not, both broker will have zero rank
*/
@SuppressWarnings("unchecked")
@Test
public void testLoadReportsWrittenOnMetadataStore() throws Exception {
for (int i = 0; i < BROKER_COUNT; i++) {
String path = String.format("%s/%s", SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT,
lookupAddresses[i]);
byte[] loadReportData = pulsarServices[i].getLocalMetadataStore().get(path).join().get().getValue();
assertTrue(loadReportData.length > 0);
log.info("LoadReport {}, {}", lookupAddresses[i], new String(loadReportData));
LoadReport loadReport = ObjectMapperFactory.getMapper().reader().readValue(loadReportData, LoadReport.class);
assertEquals(loadReport.getName(), lookupAddresses[i]);
// Check Initial Ranking is populated in both the brokers
Field ranking = ((SimpleLoadManagerImpl) pulsarServices[i].getLoadManager().get()).getClass()
.getDeclaredField("sortedRankings");
ranking.setAccessible(true);
AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = (AtomicReference<Map<Long, Set<ResourceUnit>>>) ranking
.get(pulsarServices[i].getLoadManager().get());
printSortedRanking(sortedRanking);
// all brokers have same rank to it would be 0 --> set-of-all-the-brokers
int brokerCount = 0;
for (Map.Entry<Long, Set<ResourceUnit>> entry : sortedRanking.get().entrySet()) {
brokerCount += entry.getValue().size();
}
assertEquals(brokerCount, BROKER_COUNT);
TopicName topicName = TopicName.get("persistent://pulsar/use/primary-ns/test-topic");
ResourceUnit found = pulsarServices[i].getLoadManager().get()
.getLeastLoaded(pulsarServices[i].getNamespaceService().getBundle(topicName)).get();
assertNotNull(found);
}
}
/*
* tests rankings get updated when we write the new load reports to the zookeeper on load-balance root node
* tests writing pre-configured load report on the zookeeper translates the pre-calculated rankings
*/
@Test
public void testUpdateLoadReportAndCheckUpdatedRanking() throws Exception {
for (int i = 0; i < BROKER_COUNT; i++) {
LoadReport lr = new LoadReport();
lr.setName(lookupAddresses[i]);
SystemResourceUsage sru = new SystemResourceUsage();
sru.setBandwidthIn(new ResourceUsage(256, 1024000));
sru.setBandwidthOut(new ResourceUsage(250, 1024000));
sru.setMemory(new ResourceUsage(1024, 8192));
sru.setCpu(new ResourceUsage(5, 400));
lr.setSystemResourceUsage(sru);
FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
for (int i = 0; i < BROKER_COUNT; i++) {
MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking");
}
// do lookup for a bunch of bundles
int totalNamespaces = 200;
Map<String, Integer> namespaceOwner = new HashMap<>();
for (int i = 0; i < totalNamespaces; i++) {
TopicName topicName = TopicName.get("persistent://pulsar/use/primary-ns-" + i + "/test-topic");
ResourceUnit found = pulsarServices[0].getLoadManager().get()
.getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(topicName)).get();
if (namespaceOwner.containsKey(found.getResourceId())) {
namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
} else {
namespaceOwner.put(found.getResourceId(), 1);
}
}
// assert that distribution variation is not more than 20%
int averageNamespaces = totalNamespaces / BROKER_COUNT;
int tenPercentOfAverageNamespaces = averageNamespaces / 10;
int lowerBound = averageNamespaces - tenPercentOfAverageNamespaces;
int upperBound = averageNamespaces + tenPercentOfAverageNamespaces;
// assert each broker received ownership of fair amount of namespaces 90%+
for (Map.Entry<String, Integer> broker : namespaceOwner.entrySet()) {
log.info("Count of bundles assigned: {}, {} -- lower-bound: {} - upper-bound: {} ",
broker.getKey(), broker.getValue(), lowerBound, upperBound);
assertTrue(broker.getValue() >= lowerBound && broker.getValue() <= upperBound);
}
}
@SuppressWarnings("unchecked")
@SneakyThrows
private void updateLastReport(LoadManager lm, LoadReport lr){
ResourceLock<LoadReport> lock = (ResourceLock<LoadReport>) FieldUtils.readField(lm, "brokerLock", true);
lock.updateValue(lr).join();
}
private AtomicReference<Map<Long, Set<ResourceUnit>>> getSortedRanking(PulsarService pulsar)
throws NoSuchFieldException, IllegalAccessException {
Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
.getDeclaredField("sortedRankings");
ranking.setAccessible(true);
@SuppressWarnings("unchecked")
AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = (AtomicReference<Map<Long, Set<ResourceUnit>>>) ranking
.get(pulsar.getLoadManager().get());
return sortedRanking;
}
private void printSortedRanking(AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking) {
log.info("Sorted Ranking Result:");
sortedRanking.get().forEach((score, rus) -> {
for (ResourceUnit ru : rus) {
log.info(" - {}, {}", ru.getResourceId(), score);
}
});
}
/*
* Pre-publish load report to ZK, each broker has: - Difference memory capacity, for the first 3 brokers memory is
* bottleneck, for the 4/5th brokers CPU become bottleneck since memory is big enough - non-bundles assigned so all
* idle resources are available for new bundle Check the broker rankings are the load percentage of each broker.
*/
@Test(timeOut = 30000)
public void testBrokerRanking() throws Exception {
for (int i = 0; i < BROKER_COUNT; i++) {
LoadReport lr = new LoadReport();
lr.setName(lookupAddresses[i]);
SystemResourceUsage sru = new SystemResourceUsage();
sru.setBandwidthIn(new ResourceUsage(0, 1024000));
sru.setBandwidthOut(new ResourceUsage(0, 1024000));
sru.setMemory(new ResourceUsage(1024, 2048 * (i + 1)));
sru.setCpu(new ResourceUsage(60, 400));
lr.setSystemResourceUsage(sru);
updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
for (int i = 0; i < BROKER_COUNT; i++) {
LoadManager loadManager = pulsarServices[i].getLoadManager().get();
Awaitility.await().until(() -> {
Future<?> f = ((SimpleLoadManagerImpl) loadManager).getUpdateRankingHandle();
return f != null && f.isDone();
});
}
// check the ranking result
for (int i = 0; i < BROKER_COUNT; i++) {
AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = getSortedRanking(pulsarServices[i]);
printSortedRanking(sortedRanking);
// brokers' ranking would be:
// 50 --> broker 0 ( 1024 / 2048 )
// 25 --> broker 1 ( 1024 / 4096 )
// 16 --> broker 2 ( 1024 / 6144 )
// 15 --> broker 3 ( 60 / 400 )
// 15 --> broker 4 ( 60 / 400 )
assertEquals(sortedRanking.get().get(50L).size(), 1);
assertEquals(sortedRanking.get().get(25L).size(), 1);
assertEquals(sortedRanking.get().get(16L).size(), 1);
}
}
/*
* Pre-publish load report to ZK, each broker has: - Difference memory capacity, for the first 3 brokers memory is
* bottleneck, for the 4/5th brokers CPU become bottleneck since memory is big enough - already has some bundles
* assigned Check the distribution of new topics is roughly consistent (with <10% variation) with the ranking
*/
@Test
public void testTopicAssignmentWithExistingBundles() throws Exception {
for (int i = 0; i < BROKER_COUNT; i++) {
ResourceQuota defaultQuota = new ResourceQuota();
defaultQuota.setMsgRateIn(20);
defaultQuota.setMsgRateOut(60);
defaultQuota.setBandwidthIn(20000);
defaultQuota.setBandwidthOut(60000);
defaultQuota.setMemory(87);
pulsarServices[i].getBrokerService().getBundlesQuotas().setDefaultResourceQuota(defaultQuota).join();
LoadReport lr = new LoadReport();
lr.setName(lookupAddresses[i]);
SystemResourceUsage sru = new SystemResourceUsage();
sru.setBandwidthIn(new ResourceUsage(0, 1024000));
sru.setBandwidthOut(new ResourceUsage(0, 1024000));
sru.setMemory(new ResourceUsage(0, 2048 * (i + 1)));
sru.setCpu(new ResourceUsage(60, 400));
lr.setSystemResourceUsage(sru);
Map<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
for (int j = 0; j < (i + 1) * 5; j++) {
String bundleName = String.format("pulsar/use/primary-ns-%d-%d/0x00000000_0xffffffff", i, j);
NamespaceBundleStats stats = new NamespaceBundleStats();
bundleStats.put(bundleName, stats);
}
lr.setBundleStats(bundleStats);
FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
for (int i = 0; i < BROKER_COUNT; i++) {
MethodUtils.invokeMethod(pulsarServices[0].getLoadManager().get(), true, "updateRanking");
}
// print ranking
for (int i = 0; i < BROKER_COUNT; i++) {
AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = getSortedRanking(pulsarServices[i]);
printSortedRanking(sortedRanking);
}
// check owner of new destinations and verify that the distribution is roughly
// consistent (variation < 10%) with the broker capacity:
int totalNamespaces = 250;
int[] expectedAssignments = new int[] { 17, 34, 51, 68, 85 };
Map<String, Integer> namespaceOwner = new HashMap<>();
for (int i = 0; i < totalNamespaces; i++) {
TopicName topicName = TopicName.get("persistent://pulsar/use/primary-ns-" + i + "/test-topic");
ResourceUnit found = pulsarServices[0].getLoadManager().get()
.getLeastLoaded(pulsarServices[0].getNamespaceService().getBundle(topicName)).get();
if (namespaceOwner.containsKey(found.getResourceId())) {
namespaceOwner.put(found.getResourceId(), namespaceOwner.get(found.getResourceId()) + 1);
} else {
namespaceOwner.put(found.getResourceId(), 1);
}
}
double expectedMaxVariation = 10.0;
for (int i = 0; i < BROKER_COUNT; i++) {
long actualValue = 0;
String resourceId = lookupAddresses[i];
if (namespaceOwner.containsKey(resourceId)) {
actualValue = namespaceOwner.get(resourceId);
}
long expectedValue = expectedAssignments[i];
double variation = Math.abs(actualValue - expectedValue) * 100.0 / expectedValue;
log.info("Topic assignment - {}, actual: {}, expected baseline: {}, variation: {}/%",
lookupAddresses[i], actualValue, expectedValue, String.format("%.2f", variation));
assertTrue(variation < expectedMaxVariation);
}
}
private AtomicReference<Map<String, ResourceQuota>> getRealtimeResourceQuota(PulsarService pulsar)
throws NoSuchFieldException, IllegalAccessException {
Field quotasField = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
.getDeclaredField("realtimeResourceQuotas");
quotasField.setAccessible(true);
AtomicReference<Map<String, ResourceQuota>> realtimeResourceQuotas = (AtomicReference<Map<String, ResourceQuota>>) quotasField
.get(pulsar.getLoadManager().get());
return realtimeResourceQuotas;
}
private void printResourceQuotas(Map<String, ResourceQuota> resourceQuotas) throws Exception {
log.info("Realtime Resource Quota:");
for (Map.Entry<String, ResourceQuota> entry : resourceQuotas.entrySet()) {
String quotaStr = ObjectMapperFactory.getMapper().writer().writeValueAsString(entry.getValue());
log.info(" {}, {}", entry.getKey(), quotaStr);
}
}
private void writeLoadReportsForDynamicQuota(long timestamp) throws Exception {
for (int i = 0; i < BROKER_COUNT; i++) {
LoadReport lr = new LoadReport();
lr.setName(lookupAddresses[i]);
lr.setTimestamp(timestamp);
SystemResourceUsage sru = new SystemResourceUsage();
sru.setBandwidthIn(new ResourceUsage(5000 * (10 + i * 5), 1024000));
sru.setBandwidthOut(new ResourceUsage(15000 * (10 + i * 5), 1024000));
sru.setMemory(new ResourceUsage(25 * (10 + i * 5), 2048 * (i + 1)));
sru.setCpu(new ResourceUsage(200, 400));
lr.setSystemResourceUsage(sru);
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
for (int j = 0; j < 5; j++) {
String bundleName = String.format("pulsar/use/primary-ns-%d-%d/0x00000000_0xffffffff", i, j);
NamespaceBundleStats stats = new NamespaceBundleStats();
stats.msgRateIn = 5 * (i + j);
stats.msgRateOut = 15 * (i + j);
stats.msgThroughputIn = 5000 * (i + j);
stats.msgThroughputOut = 15000 * (i + j);
stats.topics = 25L * (i + j);
stats.consumerCount = 50 * (i + j);
stats.producerCount = 50 * (i + j);
bundleStats.put(bundleName, stats);
}
lr.setBundleStats(bundleStats);
updateLastReport(pulsarServices[i].getLoadManager().get(), lr);
}
}
private void verifyBundleResourceQuota(ResourceQuota quota, double expMsgRateIn, double expMsgRateOut,
double expBandwidthIn, double expBandwidthOut, double expMemory) {
assertTrue(Math.abs(quota.getMsgRateIn() - expMsgRateIn) < 1);
assertTrue(Math.abs(quota.getMsgRateOut() - expMsgRateOut) < 1);
assertTrue(Math.abs(quota.getBandwidthIn() - expBandwidthIn) < 1);
assertTrue(Math.abs(quota.getBandwidthOut() - expBandwidthOut) < 1);
assertTrue(Math.abs(quota.getMemory() - expMemory) < 1);
}
/*
* Test broker dynamically calculating resource quota for each connected namespace bundle.
*/
@Test
public void testDynamicNamespaceBundleQuota() throws Exception {
long startTime = System.currentTimeMillis();
for (int i = 0; i < BROKER_COUNT; i++) {
ResourceQuota defaultQuota = new ResourceQuota();
defaultQuota.setMsgRateIn(20);
defaultQuota.setMsgRateOut(60);
defaultQuota.setBandwidthIn(20000);
defaultQuota.setBandwidthOut(60000);
defaultQuota.setMemory(75);
pulsarServices[i].getBrokerService().getBundlesQuotas().setDefaultResourceQuota(defaultQuota).join();
}
// publish the initial load reports and wait for quotas be updated
writeLoadReportsForDynamicQuota(startTime);
Thread.sleep(5000);
// publish test report for 15 minutes later and wait for quotas be updated
writeLoadReportsForDynamicQuota(startTime + SimpleLoadManagerImpl.RESOURCE_QUOTA_GO_UP_TIMEWINDOW);
Thread.sleep(5000);
// print & verify resource quotas
for (int i = 0; i < BROKER_COUNT; i++) {
Map<String, ResourceQuota> quotas = getRealtimeResourceQuota(pulsarServices[i]).get();
printResourceQuotas(quotas);
verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-0-0/0x00000000_0xffffffff"), 19.0, 58.0,
19791.0, 58958.0, 74.0);
verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-2-2/0x00000000_0xffffffff"), 20.0, 60.0,
20000.0, 60000.0, 100.0);
verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-4-4/0x00000000_0xffffffff"), 40.0, 120.0,
40000.0, 120000.0, 150.0);
}
// publish test report for 24 hours later and wait for quotas be updated
writeLoadReportsForDynamicQuota(startTime + SimpleLoadManagerImpl.RESOURCE_QUOTA_GO_DOWN_TIMEWINDOW);
Thread.sleep(5000);
// print & verify resource quotas
for (int i = 0; i < BROKER_COUNT; i++) {
Map<String, ResourceQuota> quotas = getRealtimeResourceQuota(pulsarServices[i]).get();
printResourceQuotas(quotas);
verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-0-0/0x00000000_0xffffffff"), 5.0, 6.0, 10203.0,
11019.0, 50.0);
verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-2-2/0x00000000_0xffffffff"), 20.0, 60.0,
20000.0, 60000.0, 100.0);
verifyBundleResourceQuota(quotas.get("pulsar/use/primary-ns-4-4/0x00000000_0xffffffff"), 40.0, 120.0,
40000.0, 120000.0, 150.0);
}
}
private NamespaceBundleStats newBundleStats(long topics, int producers, int consumers, double msgRateIn,
double msgRateOut, double throughputIn, double throughputOut) {
NamespaceBundleStats stats = new NamespaceBundleStats();
stats.topics = topics;
stats.producerCount = producers;
stats.consumerCount = consumers;
stats.msgRateIn = msgRateIn;
stats.msgRateOut = msgRateOut;
stats.msgThroughputIn = throughputIn;
stats.msgThroughputOut = throughputOut;
return stats;
}
private BundlesData getBundles(int numBundles) {
Long maxVal = ((long) 1) << 32;
Long segSize = maxVal / numBundles;
List<String> partitions = new ArrayList<>();
partitions.add(String.format("0x%08x", 0l));
Long curPartition = segSize;
for (int i = 0; i < numBundles; i++) {
if (i != numBundles - 1) {
partitions.add(String.format("0x%08x", curPartition));
} else {
partitions.add(String.format("0x%08x", maxVal - 1));
}
curPartition += segSize;
}
return BundlesData.builder()
.boundaries(partitions)
.numBundles(partitions.size() - 1)
.build();
}
private void createNamespace(PulsarService pulsar, String namespace, int numBundles) throws Exception {
Policies policies = new Policies();
policies.bundles = getBundles(numBundles);
pulsar.getPulsarResources().getNamespaceResources().createPolicies(NamespaceName.get(namespace), policies);
}
/**
* Test the namespace bundle auto-split
*/
@Test
public void testNamespaceBundleAutoSplit() throws Exception {
int maxBundles = pulsarServices[0].getConfiguration().getLoadBalancerNamespaceMaximumBundles();
long maxTopics = pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxTopics();
int maxSessions = pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxSessions();
long maxMsgRate = pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBandwidth = pulsarServices[0].getConfiguration().getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1048576L;
pulsarServices[0].getConfiguration().setLoadBalancerAutoBundleSplitEnabled(true);
// create namespaces
for (int i = 1; i <= 10; i++) {
int numBundles = (i == 10) ? maxBundles : 2;
createNamespace(pulsarServices[0], String.format("pulsar/use/primary-ns-%02d", i), numBundles);
}
// fake Namespaces Admin
CompletableFuture<NamespacesImpl> namespaceAdminFuture = new CompletableFuture<>();
try (MockedConstruction<NamespacesImpl> ignore = Mockito.mockConstruction(
NamespacesImpl.class, (allocator, context) -> namespaceAdminFuture.complete(allocator))) {
pulsarServices[0].getAdminClient();
}
NamespacesImpl namespaceAdmin = namespaceAdminFuture.get();
// create load report
// namespace 01~09 need to be split
// namespace 08~10 don't need or cannot be split
LoadReport lr = new LoadReport();
lr.setName(lookupAddresses[0]);
lr.setSystemResourceUsage(new SystemResourceUsage());
Map<String, NamespaceBundleStats> bundleStats = new HashMap<String, NamespaceBundleStats>();
bundleStats.put("pulsar/use/primary-ns-01/0x00000000_0x80000000",
newBundleStats(maxTopics + 1, 0, 0, 0, 0, 0, 0));
bundleStats.put("pulsar/use/primary-ns-02/0x00000000_0x80000000",
newBundleStats(2, maxSessions + 1, 0, 0, 0, 0, 0));
bundleStats.put("pulsar/use/primary-ns-03/0x00000000_0x80000000",
newBundleStats(2, 0, maxSessions + 1, 0, 0, 0, 0));
bundleStats.put("pulsar/use/primary-ns-04/0x00000000_0x80000000",
newBundleStats(2, 0, 0, maxMsgRate + 1, 0, 0, 0));
bundleStats.put("pulsar/use/primary-ns-05/0x00000000_0x80000000",
newBundleStats(2, 0, 0, 0, maxMsgRate + 1, 0, 0));
bundleStats.put("pulsar/use/primary-ns-06/0x00000000_0x80000000",
newBundleStats(2, 0, 0, 0, 0, maxBandwidth + 1, 0));
bundleStats.put("pulsar/use/primary-ns-07/0x00000000_0x80000000",
newBundleStats(2, 0, 0, 0, 0, 0, maxBandwidth + 1));
bundleStats.put("pulsar/use/primary-ns-08/0x00000000_0x80000000",
newBundleStats(maxTopics - 1, maxSessions - 1, 1, maxMsgRate - 1, 1, maxBandwidth - 1, 1));
bundleStats.put("pulsar/use/primary-ns-09/0x00000000_0x80000000",
newBundleStats(1, 0, 0, 0, 0, 0, maxBandwidth + 1));
bundleStats.put("pulsar/use/primary-ns-10/0x00000000_0x02000000",
newBundleStats(maxTopics + 1, 0, 0, 0, 0, 0, 0));
lr.setBundleStats(bundleStats);
FieldUtils.writeField(pulsarServices[0].getLoadManager().get(), "lastLoadReport", lr, true);
updateLastReport(pulsarServices[0].getLoadManager().get(), lr);
// sleep to wait load ranking be triggered and trigger bundle split
Thread.sleep(5000);
pulsarServices[0].getLoadManager().get().doNamespaceBundleSplit();
boolean isAutoUnooadSplitBundleEnabled = pulsarServices[0].getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
// verify bundles are split
verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-01", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-02", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-03", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-04", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-05", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-06", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, times(1)).splitNamespaceBundle("pulsar/use/primary-ns-07", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-08", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-09", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
verify(namespaceAdmin, never()).splitNamespaceBundle("pulsar/use/primary-ns-10", "0x00000000_0x02000000",
isAutoUnooadSplitBundleEnabled, null);
// disable max session
bundleStats.put("pulsar/use/primary-ns-03/0x00000000_0x80000000",
newBundleStats(2, -1, 0, 0, 0, 0, 0));
verify(namespaceAdmin, times(0)).splitNamespaceBundle("pulsar/use/primary-ns-12", "0x00000000_0x80000000",
isAutoUnooadSplitBundleEnabled, null);
}
/*
* Test all brokers are consistent on current leader and close leader to trigger re-election.
*/
@Test
public void testLeaderElection() throws Exception {
for (int i = 0; i < BROKER_COUNT - 1; i++) {
List<PulsarService> activePulsar = new ArrayList<>();
List<PulsarService> followerPulsar = new ArrayList<>();
LeaderBroker oldLeader = null;
PulsarService leaderPulsar = null;
for (int j = 0; j < BROKER_COUNT; j++) {
PulsarService pulsarService = pulsarServices[j];
if (pulsarService.getState() != PulsarService.State.Closed) {
activePulsar.add(pulsarService);
LeaderElectionService les = pulsarService.getLeaderElectionService();
if (les.isLeader()) {
oldLeader = les.getCurrentLeader().get();
leaderPulsar = pulsarService;
} else {
followerPulsar.add(pulsarService);
}
}
}
// Make sure all brokers see the same leader
log.info("Old leader is : {}", oldLeader.getBrokerId());
for (PulsarService pulsar : activePulsar) {
log.info("Current leader for {} is : {}", pulsar.getWebServiceAddress(), pulsar.getLeaderElectionService().getCurrentLeader());
assertEquals(pulsar.getLeaderElectionService().readCurrentLeader().join(), Optional.of(oldLeader));
}
// Do leader election by killing the leader broker
leaderPulsar.close();
loopUntilLeaderChangesForAllBroker(followerPulsar, oldLeader);
LeaderBroker newLeader = followerPulsar.get(0).getLeaderElectionService().readCurrentLeader().join().get();
log.info("New leader is : {}", newLeader.getBrokerId());
Assert.assertNotEquals(newLeader, oldLeader);
}
}
private void createNamespacePolicies(PulsarService pulsar) throws Exception {
// // prepare three policies for the namespace isolation
NamespaceIsolationPolicies policies = new NamespaceIsolationPolicies();
// set up policy that use this broker as primary
Map<String, String> parameters = new HashMap<>();
parameters.put("min_limit", "1");
parameters.put("usage_threshold", "100");
List<String> allBrokers = new ArrayList<>();
for (int i = 0; i < BROKER_COUNT; i++) {
allBrokers.add(pulsarServices[i].getAdvertisedAddress());
}
NamespaceIsolationData policyData = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("pulsar/use/primary-ns.*"))
.primary(allBrokers)
.secondary(Collections.emptyList())
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters)
.build())
.build();
policies.setPolicy("primaryBrokerPolicy", policyData);
List<String> allExceptFirstBroker = new ArrayList<>();
for (int i = 1; i < BROKER_COUNT; i++) {
allExceptFirstBroker.add(pulsarServices[i].getAdvertisedAddress());
}
// set up policy that use this broker as secondary
policyData = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("pulsar/use/secondary-ns.*"))
.primary(Collections.singletonList(pulsarServices[0].getAdvertisedAddress()))
.secondary(allExceptFirstBroker)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters)
.build())
.build();
policies.setPolicy("secondaryBrokerPolicy", policyData);
// set up policy that do not use this broker (neither primary nor secondary)
policyData = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("pulsar/use/shared-ns.*"))
.primary(Collections.singletonList(pulsarServices[0].getAdvertisedAddress()))
.secondary(allExceptFirstBroker)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters)
.build())
.build();
policies.setPolicy("otherBrokerPolicy", policyData);
try {
pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies().createIsolationData("use",
policies.getPolicies());
} catch (BadVersionException e) {
// isolation policy already exist
pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies().setIsolationData("use",
data -> policies.getPolicies());
}
}
/*
* creates a ResourceDescription where the max limits for different parameters are as below
*
* Memory = 16GB cpu = 100 percent bandwidthIn = 1Gbps bandwidthOut = 1Gbps threads = 100
*/
private PulsarResourceDescription createResourceDescription(long memoryInMB, long cpuPercentage,
long bandwidthInMbps, long bandwidthOutInMbps, long threads) {
long KB = 1024;
long MB = 1024 * KB;
long GB = 1024 * MB;
PulsarResourceDescription rd = new PulsarResourceDescription();
rd.put("memory", new ResourceUsage(memoryInMB, 4 * GB));
rd.put("cpu", new ResourceUsage(cpuPercentage, 100));
rd.put("bandwidthIn", new ResourceUsage(bandwidthInMbps * MB, GB));
rd.put("bandwidthOut", new ResourceUsage(bandwidthOutInMbps * MB, GB));
return rd;
}
}