blob: 79c6bdfbc9d8cf86198b34af39246f8ee04a42e2 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.broker.loadbalance.PlacementStrategy;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUnitRanking;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage;
import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCacheListener;
import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListener<LoadReport> {
private static final Logger log = LoggerFactory.getLogger(SimpleLoadManagerImpl.class);
private SimpleResourceAllocationPolicies policies;
private PulsarService pulsar;
// average JVM heap usage for
private long avgJvmHeapUsageMBytes = 0;
// load report got from each broker
private Map<ResourceUnit, LoadReport> currentLoadReports;
// load ranking for each broker from multiple perspective
private Map<ResourceUnit, ResourceUnitRanking> resourceUnitRankings;
// sorted load ranking on one single dimension
private AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRankings = new AtomicReference<>();
// rotation cursor between brokers
private long brokerRotationCursor = 0;
// load balancing metrics
private AtomicReference<List<Metrics>> loadBalancingMetrics = new AtomicReference<>();
// Cache of brokers to be used in applying policies and determining final candidates.
private final Set<String> brokerCandidateCache;
// Other policy selection caches.
private final Set<String> availableBrokersCache;
// Caches for bundle gains and losses.
private final Set<String> bundleGainsCache;
private final Set<String> bundleLossesCache;
// CPU usage per msg/sec
private double realtimeCpuLoadFactor = 0.025;
// memory usage per 500 (topics + producers + consumers)
private double realtimeMemoryLoadFactor = 25.0;
// realtime average resource quota
private ResourceQuota realtimeAvgResourceQuota = null;
// realtime resource quota calculated from the latest load reports
private AtomicReference<Map<String, ResourceQuota>> realtimeResourceQuotas = new AtomicReference<>();
// timestamp when the resource quotas were re-calculated
private long lastResourceQuotaUpdateTimestamp = -1;
public static final long RESOURCE_QUOTA_GO_UP_TIMEWINDOW = TimeUnit.MINUTES.toMillis(30);
public static final long RESOURCE_QUOTA_GO_DOWN_TIMEWINDOW = TimeUnit.MINUTES.toMillis(1440);
private static final double RESOURCE_QUOTA_MIN_CPU_FACTOR = 0.01;
private static final double RESOURCE_QUOTA_MAX_CPU_FACTOR = 0.1;
private static final double RESOURCE_QUOTA_MIN_MEM_FACTOR = 10;
private static final double RESOURCE_QUOTA_MAX_MEM_FACTOR = 50;
private static final long RESOURCE_QUOTA_MIN_MSGRATE_IN = 5;
private static final long RESOURCE_QUOTA_MIN_MSGRATE_OUT = 5;
private static final long RESOURCE_QUOTA_MIN_BANDWIDTH_IN = 10000;
private static final long RESOURCE_QUOTA_MIN_BANDWIDTH_OUT = 10000;
private static final long RESOURCE_QUOTA_MIN_MEMORY = 2;
private static final long RESOURCE_QUOTA_MAX_MSGRATE_IN = 0;
private static final long RESOURCE_QUOTA_MAX_MSGRATE_OUT = 0;
private static final long RESOURCE_QUOTA_MAX_BANDWIDTH_IN = 1000000;
private static final long RESOURCE_QUOTA_MAX_BANDWIDTH_OUT = 1000000;
private static final long RESOURCE_QUOTA_MAX_MEMORY = 200;
private final PlacementStrategy placementStrategy;
private ZooKeeperDataCache<LoadReport> loadReportCacheZk;
private ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache;
private BrokerHostUsage brokerHostUsage;
private LoadingCache<String, PulsarAdmin> adminCache;
private LoadingCache<String, Long> unloadedHotNamespaceCache;
public static final String LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH = "/loadbalance/settings/strategy";
private static final String LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH = "/loadbalance/settings/load_factor_cpu";
private static final String LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH = "/loadbalance/settings/load_factor_mem";
private static final String LOADBALANCER_DYNAMIC_SETTING_OVERLOAD_THRESHOLD_ZPATH = "/loadbalance/settings/overload_threshold";
private static final String LOADBALANCER_DYNAMIC_SETTING_COMFORT_LOAD_THRESHOLD_ZPATH = "/loadbalance/settings/comfort_load_threshold";
private static final String LOADBALANCER_DYNAMIC_SETTING_UNDERLOAD_THRESHOLD_ZPATH = "/loadbalance/settings/underload_threshold";
private static final String LOADBALANCER_DYNAMIC_SETTING_AUTO_BUNDLE_SPLIT_ENABLED = "/loadbalance/settings/auto_bundle_split_enabled";
private static final String SETTING_NAME_LOAD_FACTOR_CPU = "loadFactorCPU";
private static final String SETTING_NAME_LOAD_FACTOR_MEM = "loadFactorMemory";
private static final String SETTING_NAME_STRATEGY = "loadBalancerStrategy";
private static final String SETTING_NAME_OVERLOAD_THRESHOLD = "overloadThreshold";
private static final String SETTING_NAME_UNDERLOAD_THRESHOLD = "underloadThreshold";
private static final String SETTING_NAME_COMFORTLOAD_THRESHOLD = "comfortLoadThreshold";
private static final String SETTING_NAME_AUTO_BUNDLE_SPLIT_ENABLED = "autoBundleSplitEnabled";
public static final String LOADBALANCER_STRATEGY_LLS = "leastLoadedServer";
public static final String LOADBALANCER_STRATEGY_RAND = "weightedRandomSelection";
public static final String LOADBALANCER_STRATEGY_LEAST_MSG = "leastMsgPerSecond";
private String brokerZnodePath;
private final ScheduledExecutorService scheduler;
private ZooKeeperChildrenCache availableActiveBrokers;
private static final long MBytes = 1024 * 1024;
// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
// last LoadReport stored in ZK
private LoadReport lastLoadReport;
// last timestamp resource usage was checked
private long lastResourceUsageTimestamp = -1;
// flag to force update load report
private boolean forceLoadReportUpdate = false;
// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newScheduledThreadPool(1);
this.sortedRankings.set(new TreeMap<>());
this.currentLoadReports = new HashMap<>();
this.resourceUnitRankings = new HashMap<>();
this.loadBalancingMetrics.set(Lists.newArrayList());
this.realtimeResourceQuotas.set(new HashMap<>());
this.realtimeAvgResourceQuota = new ResourceQuota();
placementStrategy = new WRRPlacementStrategy();
bundleGainsCache = new HashSet<>();
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
}
@Override
public void initialize(final PulsarService pulsar) {
if (SystemUtils.IS_OS_LINUX) {
brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar);
} else {
brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
}
this.policies = new SimpleResourceAllocationPolicies(pulsar);
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
@Override
public LoadReport deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, LoadReport.class);
}
};
loadReportCacheZk.registerListener(this);
this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String, String>>(pulsar.getLocalZkCache()) {
@Override
public Map<String, String> deserialize(String key, byte[] content) throws Exception {
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
adminCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, PulsarAdmin>() {
public void onRemoval(RemovalNotification<String, PulsarAdmin> removal) {
removal.getValue().close();
}
}).expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, PulsarAdmin>() {
@Override
public PulsarAdmin load(String key) throws Exception {
// key - broker name already is valid URL, has prefix "http://"
return new PulsarAdmin(new URL(key), pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
});
int entryExpiryTime = (int) pulsar.getConfiguration().getLoadBalancerSheddingGracePeriodMinutes();
unloadedHotNamespaceCache = CacheBuilder.newBuilder().expireAfterWrite(entryExpiryTime, TimeUnit.MINUTES)
.build(new CacheLoader<String, Long>() {
@Override
public Long load(String key) throws Exception {
return System.currentTimeMillis();
}
});
availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(), LOADBALANCE_BROKERS_ROOT);
availableActiveBrokers.registerListener(new ZooKeeperCacheListener<Set<String>>() {
@Override
public void onUpdate(String path, Set<String> data, Stat stat) {
if (log.isDebugEnabled()) {
log.debug("Update Received for path {}", path);
}
scheduler.submit(SimpleLoadManagerImpl.this::updateRanking);
}
});
this.pulsar = pulsar;
}
public SimpleLoadManagerImpl(PulsarService pulsar) {
this();
initialize(pulsar);
}
@Override
public void start() throws PulsarServerException {
try {
// Register the brokers in zk list
ServiceConfiguration conf = pulsar.getConfiguration();
if (pulsar.getZkClient().exists(LOADBALANCE_BROKERS_ROOT, false) == null) {
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), LOADBALANCE_BROKERS_ROOT, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
// ignore the exception, node might be present already
}
}
String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + conf.getWebServicePort();
brokerZnodePath = LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
LoadReport loadReport = null;
try {
loadReport = generateLoadReport();
this.lastResourceUsageTimestamp = loadReport.getTimestamp();
} catch (Exception e) {
log.warn("Unable to get load report to write it on zookeeper [{}]", e);
}
String loadReportJson = "";
if (loadReport != null) {
loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(loadReport);
}
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
// Catching excption here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
throw e;
}
// first time, populate the broker ranking
updateRanking();
log.info("Created broker ephemeral node on {}", brokerZnodePath);
// load default resource quota
this.realtimeAvgResourceQuota = pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota();
this.lastResourceQuotaUpdateTimestamp = System.currentTimeMillis();
this.realtimeCpuLoadFactor = getDynamicConfigurationDouble(
LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, SETTING_NAME_LOAD_FACTOR_CPU,
this.realtimeCpuLoadFactor);
this.realtimeMemoryLoadFactor = getDynamicConfigurationDouble(
LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH, SETTING_NAME_LOAD_FACTOR_MEM,
this.realtimeMemoryLoadFactor);
} catch (Exception e) {
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
throw new PulsarServerException(e);
}
}
@Override
public void disableBroker() throws Exception {
if (isNotEmpty(brokerZnodePath)) {
pulsar.getZkClient().delete(brokerZnodePath, -1);
}
}
public ZooKeeperChildrenCache getActiveBrokersCache() {
return this.availableActiveBrokers;
}
public ZooKeeperDataCache<LoadReport> getLoadReportCache() {
return this.loadReportCacheZk;
}
private void setDynamicConfigurationToZK(String zkPath, Map<String, String> settings) throws IOException {
byte[] settingBytes = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(settings);
try {
if (pulsar.getLocalZkCache().exists(zkPath)) {
pulsar.getZkClient().setData(zkPath, settingBytes, -1);
} else {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), zkPath, settingBytes, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (Exception e) {
log.warn("Got exception when writing to ZooKeeper path [{}]:", zkPath, e);
}
}
private String getDynamicConfigurationFromZK(String zkPath, String settingName, String defaultValue) {
try {
return dynamicConfigurationCache.get(zkPath).map(c -> c.get(settingName)).orElse(defaultValue);
} catch (Exception e) {
log.warn("Got exception when reading ZooKeeper path [{}]:", zkPath, e);
return defaultValue;
}
}
private double getDynamicConfigurationDouble(String zkPath, String settingName, double defaultValue) {
double result = defaultValue;
try {
String setting = this.getDynamicConfigurationFromZK(zkPath, settingName, null);
if (setting != null) {
result = Double.parseDouble(setting);
}
} catch (Exception e) {
log.warn("Got exception when parsing configuration from ZooKeeper path [{}]:", zkPath, e);
}
return result;
}
private boolean getDynamicConfigurationBoolean(String zkPath, String settingName, boolean defaultValue) {
boolean result = defaultValue;
try {
String setting = this.getDynamicConfigurationFromZK(zkPath, settingName, null);
if (setting != null) {
result = Boolean.parseBoolean(setting);
}
} catch (Exception e) {
log.warn("Got exception when parsing configuration from ZooKeeper path [{}]:", zkPath, e);
}
return result;
}
private String getLoadBalancerPlacementStrategy() {
String strategy = this.getDynamicConfigurationFromZK(LOADBALANCER_DYNAMIC_SETTING_STRATEGY_ZPATH,
SETTING_NAME_STRATEGY, pulsar.getConfiguration().getLoadBalancerPlacementStrategy());
if (!LOADBALANCER_STRATEGY_LLS.equals(strategy) && !LOADBALANCER_STRATEGY_RAND.equals(strategy)
&& !LOADBALANCER_STRATEGY_LEAST_MSG.equals(strategy)) {
strategy = LOADBALANCER_STRATEGY_RAND;
}
return strategy;
}
private double getCpuLoadFactorFromZK(double defaultValue) {
return getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH,
SETTING_NAME_LOAD_FACTOR_CPU, defaultValue);
}
private double getMemoryLoadFactorFromZK(double defaultValue) {
return getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH,
SETTING_NAME_LOAD_FACTOR_MEM, defaultValue);
}
@Override
public boolean isCentralized() {
String strategy = this.getLoadBalancerPlacementStrategy();
if (strategy.equals(LOADBALANCER_STRATEGY_LLS) || strategy.equals(LOADBALANCER_STRATEGY_LEAST_MSG)) {
return true;
}
return false;
}
private long getLoadBalancerBrokerUnderloadedThresholdPercentage() {
return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_UNDERLOAD_THRESHOLD_ZPATH,
SETTING_NAME_UNDERLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerUnderloadedThresholdPercentage());
}
private long getLoadBalancerBrokerOverloadedThresholdPercentage() {
return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_OVERLOAD_THRESHOLD_ZPATH,
SETTING_NAME_OVERLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerOverloadedThresholdPercentage());
}
private long getLoadBalancerBrokerComfortLoadThresholdPercentage() {
return (long) this.getDynamicConfigurationDouble(LOADBALANCER_DYNAMIC_SETTING_COMFORT_LOAD_THRESHOLD_ZPATH,
SETTING_NAME_COMFORTLOAD_THRESHOLD,
pulsar.getConfiguration().getLoadBalancerBrokerComfortLoadLevelPercentage());
}
private boolean getLoadBalancerAutoBundleSplitEnabled() {
return this.getDynamicConfigurationBoolean(LOADBALANCER_DYNAMIC_SETTING_AUTO_BUNDLE_SPLIT_ENABLED,
SETTING_NAME_AUTO_BUNDLE_SPLIT_ENABLED,
pulsar.getConfiguration().getLoadBalancerAutoBundleSplitEnabled());
}
/*
* temp method, remove it in future, in-place to make this glue code to make load balancing work
*/
private PulsarResourceDescription fromLoadReport(LoadReport report) {
SystemResourceUsage sru = report.getSystemResourceUsage();
PulsarResourceDescription resourceDescription = new PulsarResourceDescription();
if (sru.bandwidthIn != null)
resourceDescription.put("bandwidthIn", sru.bandwidthIn);
if (sru.bandwidthOut != null)
resourceDescription.put("bandwidthOut", sru.bandwidthOut);
if (sru.memory != null)
resourceDescription.put("memory", sru.memory);
if (sru.cpu != null)
resourceDescription.put("cpu", sru.cpu);
return resourceDescription;
}
private ResourceQuota getResourceQuota(String bundle) {
Map<String, ResourceQuota> quotas = this.realtimeResourceQuotas.get();
if (!quotas.containsKey(bundle)) {
ResourceQuota quota = pulsar.getLocalZkCacheService().getResourceQuotaCache().getQuota(bundle);
quotas.put(bundle, quota);
return quota;
} else {
return quotas.get(bundle);
}
}
/**
* Get the sum of allocated resource for the list of namespace bundles
*/
private ResourceQuota getTotalAllocatedQuota(Set<String> bundles) {
ResourceQuota totalQuota = new ResourceQuota();
for (String bundle : bundles) {
ResourceQuota quota = this.getResourceQuota(bundle);
totalQuota.add(quota);
}
return totalQuota;
}
private double timeSmoothValue(double oldValue, double newSample, double minValue, double maxValue, long timePast) {
newSample = Math.max(minValue, newSample);
if (maxValue > 0) {
newSample = Math.min(maxValue, newSample);
}
double weight = 0.0;
if (newSample >= oldValue) {
weight = Math.min(1, Math.max(0, (double) timePast / RESOURCE_QUOTA_GO_UP_TIMEWINDOW));
} else if (newSample < oldValue) {
weight = Math.min(1, Math.max(0, (double) timePast / RESOURCE_QUOTA_GO_DOWN_TIMEWINDOW));
}
double result = (1 - weight) * oldValue + weight * newSample;
return result;
}
private ResourceQuota timeSmoothQuota(ResourceQuota oldQuota, double msgRateIn, double msgRateOut,
double bandwidthIn, double bandwidthOut, double memory, long timePast) {
if (oldQuota.getDynamic()) {
ResourceQuota newQuota = new ResourceQuota();
newQuota.setMsgRateIn(timeSmoothValue(oldQuota.getMsgRateIn(), msgRateIn, RESOURCE_QUOTA_MIN_MSGRATE_IN,
RESOURCE_QUOTA_MAX_MSGRATE_IN, timePast));
newQuota.setMsgRateOut(timeSmoothValue(oldQuota.getMsgRateOut(), msgRateOut, RESOURCE_QUOTA_MIN_MSGRATE_OUT,
RESOURCE_QUOTA_MAX_MSGRATE_OUT, timePast));
newQuota.setBandwidthIn(timeSmoothValue(oldQuota.getBandwidthIn(), bandwidthIn,
RESOURCE_QUOTA_MIN_BANDWIDTH_IN, RESOURCE_QUOTA_MAX_BANDWIDTH_IN, timePast));
newQuota.setBandwidthOut(timeSmoothValue(oldQuota.getBandwidthOut(), bandwidthOut,
RESOURCE_QUOTA_MIN_BANDWIDTH_OUT, RESOURCE_QUOTA_MAX_BANDWIDTH_OUT, timePast));
newQuota.setMemory(timeSmoothValue(oldQuota.getMemory(), memory, RESOURCE_QUOTA_MIN_MEMORY,
RESOURCE_QUOTA_MAX_MEMORY, timePast));
return newQuota;
} else {
return oldQuota;
}
}
private synchronized void updateRealtimeResourceQuota() {
long memObjectGroupSize = 500;
if (!currentLoadReports.isEmpty()) {
long totalBundles = 0;
long totalMemGroups = 0;
double totalMsgRateIn = 0.0;
double totalMsgRateOut = 0.0;
double totalMsgRate = 0.0;
double totalCpuUsage = 0.0;
double totalMemoryUsage = 0.0;
double totalBandwidthIn = 0.0;
double totalBandwidthOut = 0.0;
long loadReportTimestamp = -1;
// update resource factors
for (Map.Entry<ResourceUnit, LoadReport> entry : currentLoadReports.entrySet()) {
LoadReport loadReport = entry.getValue();
if (loadReport.getTimestamp() > loadReportTimestamp) {
loadReportTimestamp = loadReport.getTimestamp();
}
Map<String, NamespaceBundleStats> bundleStats = loadReport.getBundleStats();
if (bundleStats == null) {
continue;
}
for (Map.Entry<String, NamespaceBundleStats> statsEntry : bundleStats.entrySet()) {
totalBundles++;
NamespaceBundleStats stats = statsEntry.getValue();
totalMemGroups += (1
+ (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize);
totalBandwidthIn += stats.msgThroughputIn;
totalBandwidthOut += stats.msgThroughputOut;
}
SystemResourceUsage resUsage = loadReport.getSystemResourceUsage();
totalMsgRateIn += loadReport.getMsgRateIn();
totalMsgRateOut += loadReport.getMsgRateOut();
totalCpuUsage = totalCpuUsage + resUsage.getCpu().usage;
totalMemoryUsage = totalMemoryUsage + resUsage.getMemory().usage;
}
totalMsgRate = totalMsgRateIn + totalMsgRateOut;
long timePast = loadReportTimestamp - this.lastResourceQuotaUpdateTimestamp;
this.lastResourceQuotaUpdateTimestamp = loadReportTimestamp;
if (totalMsgRate > 1000 && totalMemGroups > 30) {
this.realtimeCpuLoadFactor = timeSmoothValue(this.realtimeCpuLoadFactor, totalCpuUsage / totalMsgRate,
RESOURCE_QUOTA_MIN_CPU_FACTOR, RESOURCE_QUOTA_MAX_CPU_FACTOR, timePast);
this.realtimeMemoryLoadFactor = timeSmoothValue(this.realtimeMemoryLoadFactor,
totalMemoryUsage / totalMemGroups, RESOURCE_QUOTA_MIN_MEM_FACTOR, RESOURCE_QUOTA_MAX_MEM_FACTOR,
timePast);
}
// calculate average bundle
if (totalBundles > 30 && this.realtimeAvgResourceQuota.getDynamic()) {
ResourceQuota oldQuota = this.realtimeAvgResourceQuota;
ResourceQuota newQuota = timeSmoothQuota(oldQuota, totalMsgRateIn / totalBundles,
totalMsgRateOut / totalBundles, totalBandwidthIn / totalBundles,
totalBandwidthOut / totalBundles, totalMemoryUsage / totalBundles, timePast);
this.realtimeAvgResourceQuota = newQuota;
}
// update realtime quota for each bundle
Map<String, ResourceQuota> newQuotas = new HashMap<>();
for (Map.Entry<ResourceUnit, LoadReport> entry : currentLoadReports.entrySet()) {
ResourceUnit resourceUnit = entry.getKey();
LoadReport loadReport = entry.getValue();
Map<String, NamespaceBundleStats> bundleStats = loadReport.getBundleStats();
if (bundleStats == null) {
continue;
}
for (Map.Entry<String, NamespaceBundleStats> statsEntry : bundleStats.entrySet()) {
String bundle = statsEntry.getKey();
NamespaceBundleStats stats = statsEntry.getValue();
long memGroupCount = (1
+ (stats.topics + stats.producerCount + stats.consumerCount) / memObjectGroupSize);
double newMemoryQuota = memGroupCount * this.realtimeMemoryLoadFactor;
ResourceQuota oldQuota = getResourceQuota(bundle);
ResourceQuota newQuota = timeSmoothQuota(oldQuota, stats.msgRateIn, stats.msgRateOut,
stats.msgThroughputIn, stats.msgThroughputOut, newMemoryQuota, timePast);
newQuotas.put(bundle, newQuota);
}
}
this.realtimeResourceQuotas.set(newQuotas);
}
}
private void compareAndWriteQuota(String bundle, ResourceQuota oldQuota, ResourceQuota newQuota) throws Exception {
boolean needUpdate = true;
if (!oldQuota.getDynamic() || (Math
.abs(newQuota.getMsgRateIn() - oldQuota.getMsgRateIn()) < RESOURCE_QUOTA_MIN_MSGRATE_IN
&& Math.abs(newQuota.getMsgRateOut() - oldQuota.getMsgRateOut()) < RESOURCE_QUOTA_MIN_MSGRATE_OUT
&& Math.abs(newQuota.getBandwidthIn() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_IN
&& Math.abs(newQuota.getBandwidthOut() - oldQuota.getBandwidthOut()) < RESOURCE_QUOTA_MIN_BANDWIDTH_OUT
&& Math.abs(newQuota.getMemory() - oldQuota.getMemory()) < RESOURCE_QUOTA_MIN_MEMORY)) {
needUpdate = false;
}
if (needUpdate) {
log.info(String.format(
"Update quota %s - msgRateIn: %.1f, msgRateOut: %.1f, bandwidthIn: %.1f, bandwidthOut: %.1f, memory: %.1f",
(bundle == null) ? "default" : bundle, newQuota.getMsgRateIn(), newQuota.getMsgRateOut(),
newQuota.getBandwidthIn(), newQuota.getBandwidthOut(), newQuota.getMemory()));
if (bundle == null) {
pulsar.getLocalZkCacheService().getResourceQuotaCache().setDefaultQuota(newQuota);
} else {
pulsar.getLocalZkCacheService().getResourceQuotaCache().setQuota(bundle, newQuota);
}
}
}
@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
log.info("Writing namespace bundle resource quotas to ZooKeeper as leader broker");
// write the load factors
setDynamicConfigurationToZK(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_CPU_ZPATH, new HashMap<String, String>() {
{
put(SETTING_NAME_LOAD_FACTOR_CPU, Double.toString(realtimeCpuLoadFactor));
}
});
setDynamicConfigurationToZK(LOADBALANCER_DYNAMIC_SETTING_LOAD_FACTOR_MEM_ZPATH, new HashMap<String, String>() {
{
put(SETTING_NAME_LOAD_FACTOR_MEM, Double.toString(realtimeMemoryLoadFactor));
}
});
// write default quota
ResourceQuota defaultQuota = pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota();
this.compareAndWriteQuota(null, defaultQuota, this.realtimeAvgResourceQuota);
// write each bundle's quota
Map<String, ResourceQuota> quotas = this.realtimeResourceQuotas.get();
for (Map.Entry<String, ResourceQuota> entry : quotas.entrySet()) {
String bundle = entry.getKey();
ResourceQuota oldQuota = pulsar.getLocalZkCacheService().getResourceQuotaCache().getQuota(bundle);
this.compareAndWriteQuota(bundle, oldQuota, entry.getValue());
}
}
/**
* Rank brokers by available capacity, or load percentage, based on placement strategy:
*
* - Available capacity for weighted random selection (weightedRandomSelection): ranks ResourceUnits units based on
* estimation of their capacity which is basically how many bundles each ResourceUnit is able can handle with its
* available resources (CPU, memory, network, etc);
*
* - Load percentage for least loaded server (leastLoadedServer): ranks ResourceUnits units based on estimation of
* their load percentage which is basically how many percent of resource is allocated which is
* max(resource_actually_used, resource_quota)
*
* If we fail to collect the Load Reports OR fail to process them for the first time, it means the leader does not
* have enough information to make a decision so we set it to ready when we collect and process the load reports
* successfully the first time.
*/
private synchronized void doLoadRanking() {
ResourceUnitRanking.setCpuUsageByMsgRate(this.realtimeCpuLoadFactor);
String hostname = pulsar.getAdvertisedAddress();
String strategy = this.getLoadBalancerPlacementStrategy();
log.info("doLoadRanking - load balancing strategy: {}", strategy);
if (!currentLoadReports.isEmpty()) {
Map<Long, Set<ResourceUnit>> newSortedRankings = Maps.newTreeMap();
Map<ResourceUnit, ResourceUnitRanking> newResourceUnitRankings = new HashMap<>();
for (Map.Entry<ResourceUnit, LoadReport> entry : currentLoadReports.entrySet()) {
ResourceUnit resourceUnit = entry.getKey();
LoadReport loadReport = entry.getValue();
// calculate rankings
Set<String> loadedBundles = loadReport.getBundles();
Set<String> preAllocatedBundles = null;
if (resourceUnitRankings.containsKey(resourceUnit)) {
preAllocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
preAllocatedBundles.removeAll(loadedBundles);
} else {
preAllocatedBundles = new HashSet<>();
}
ResourceQuota allocatedQuota = getTotalAllocatedQuota(loadedBundles);
ResourceQuota preAllocatedQuota = getTotalAllocatedQuota(preAllocatedBundles);
ResourceUnitRanking ranking = new ResourceUnitRanking(loadReport.getSystemResourceUsage(),
loadedBundles, allocatedQuota, preAllocatedBundles, preAllocatedQuota);
newResourceUnitRankings.put(resourceUnit, ranking);
// generated sorted ranking
double loadPercentage = ranking.getEstimatedLoadPercentage();
long maxCapacity = ranking
.estimateMaxCapacity(pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota());
long finalRank = 0;
if (strategy.equals(LOADBALANCER_STRATEGY_LLS)) {
finalRank = (long) loadPercentage;
} else if (strategy.equals(LOADBALANCER_STRATEGY_LEAST_MSG)) {
finalRank = (long) ranking.getEstimatedMessageRate();
} else {
double idleRatio = (100 - loadPercentage) / 100;
finalRank = (long) (maxCapacity * idleRatio * idleRatio);
}
if (!newSortedRankings.containsKey(finalRank)) {
newSortedRankings.put(finalRank, new HashSet<ResourceUnit>());
}
newSortedRankings.get(finalRank).add(entry.getKey());
if (log.isDebugEnabled()) {
log.debug("Added Resource Unit [{}] with Rank [{}]", entry.getKey().getResourceId(), finalRank);
}
// update metrics
if (resourceUnit.getResourceId().contains(hostname)) {
updateLoadBalancingMetrics(hostname, finalRank, ranking);
}
}
this.sortedRankings.set(newSortedRankings);
this.resourceUnitRankings = newResourceUnitRankings;
} else {
log.info("Leader broker[{}] No ResourceUnits to rank this run, Using Old Ranking",
pulsar.getWebServiceAddress());
}
}
public List<Metrics> getLoadBalancingMetrics() {
List<Metrics> metrics = this.loadBalancingMetrics.get();
return metrics;
}
private void updateLoadBalancingMetrics(String hostname, long finalRank, ResourceUnitRanking ranking) {
List<Metrics> metrics = Lists.newArrayList();
Map<String, String> dimensions = new HashMap<>();
dimensions.put("broker", hostname);
Metrics m = Metrics.create(dimensions);
m.put("brk_lb_load_rank", finalRank);
m.put("brk_lb_quota_pct_cpu", ranking.getAllocatedLoadPercentageCPU());
m.put("brk_lb_quota_pct_memory", ranking.getAllocatedLoadPercentageMemory());
m.put("brk_lb_quota_pct_bandwidth_in", ranking.getAllocatedLoadPercentageBandwidthIn());
m.put("brk_lb_quota_pct_bandwidth_out", ranking.getAllocatedLoadPercentageBandwidthOut());
metrics.add(m);
this.loadBalancingMetrics.set(metrics);
}
/**
* Assign owner for specified ServiceUnit from the given candidates, following the the principles: 1) Optimum
* distribution: fill up one broker till its load reaches optimum level (defined by underload threshold) before pull
* another idle broker in; 2) Even distribution: once all brokers' load are above optimum level, maintain all
* brokers to have even load; 3) Set the underload threshold to small value (like 1) for pure even distribution, and
* high value (like 80) for pure optimum distribution;
*
* Strategy to select broker: 1) The first choice is the least loaded broker which is underload but not idle; 2) The
* second choice is idle broker (if there is any); 3) Othewise simply select the least loaded broker if it is NOT
* overloaded; 4) If all brokers are overloaded, select the broker with maximum available capacity (considering
* brokers could have different hardware configuration, this usually means to select the broker with more hardware
* resource);
*
* Broker's load level: 1) Load ranking (triggered by LoadReport update) estimate the load level according to the
* resourse usage and namespace bundles already loaded by each broker; 2) When leader broker decide the owner for a
* new namespace bundle, it may take time for the real owner to actually load the bundle and refresh LoadReport,
* leader broker will store the bundle in a list called preAllocatedBundles, and the quota of all
* preAllocatedBundles in preAllocatedQuotas, and re-estimate the broker's load level by putting the
* preAllocatedQuota into calculation; 3) Everything (preAllocatedBundles and preAllocatedQuotas) will get reset in
* load ranking.
*/
private synchronized ResourceUnit findBrokerForPlacement(Multimap<Long, ResourceUnit> candidates,
ServiceUnitId serviceUnit) {
long underloadThreshold = this.getLoadBalancerBrokerUnderloadedThresholdPercentage();
long overloadThreshold = this.getLoadBalancerBrokerOverloadedThresholdPercentage();
ResourceQuota defaultQuota = pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota();
double minLoadPercentage = 101.0;
long maxAvailability = -1;
ResourceUnit idleRU = null;
ResourceUnit maxAvailableRU = null;
ResourceUnit randomRU = null;
ResourceUnit selectedRU = null;
ResourceUnitRanking selectedRanking = null;
String serviceUnitId = serviceUnit.toString();
// If the ranking is expected to be in the range [0,100] (which is the case for LOADBALANCER_STRATEGY_LLS),
// the ranks are bounded. Otherwise (as is the case in LOADBALANCER_STRATEGY_LEAST_MSG, the ranks are simply
// the total message rate which is in the range [0,Infinity) so they are unbounded. The
// "boundedness" affects how two ranks are compared to see which one is better
boolean unboundedRanks = getLoadBalancerPlacementStrategy().equals(LOADBALANCER_STRATEGY_LEAST_MSG);
long randomBrokerIndex = (candidates.size() > 0) ? (this.brokerRotationCursor % candidates.size()) : 0;
// find the least loaded & not-idle broker
for (Map.Entry<Long, ResourceUnit> candidateOwner : candidates.entries()) {
ResourceUnit candidate = candidateOwner.getValue();
randomBrokerIndex--;
// skip broker which is not ranked. this should never happen except in unit test
if (!resourceUnitRankings.containsKey(candidate)) {
continue;
}
// check if this ServiceUnit is already pre-allocated
String resourceUnitId = candidate.getResourceId();
ResourceUnitRanking ranking = resourceUnitRankings.get(candidate);
if (ranking.isServiceUnitPreAllocated(serviceUnitId)) {
return candidate;
}
// check if this ServiceUnit is already loaded
if (ranking.isServiceUnitLoaded(serviceUnitId)) {
ranking.removeLoadedServiceUnit(serviceUnitId, this.getResourceQuota(serviceUnitId));
}
// record a random broker
if (randomBrokerIndex < 0 && randomRU == null) {
randomRU = candidate;
}
// check the available capacity
double loadPercentage = ranking.getEstimatedLoadPercentage();
double availablePercentage = Math.max(0, (100 - loadPercentage) / 100);
long availability = (long) (ranking.estimateMaxCapacity(defaultQuota) * availablePercentage);
if (availability > maxAvailability) {
maxAvailability = availability;
maxAvailableRU = candidate;
}
// check the load percentage
if (ranking.isIdle()) {
if (idleRU == null) {
idleRU = candidate;
}
} else {
if (selectedRU == null) {
selectedRU = candidate;
selectedRanking = ranking;
minLoadPercentage = loadPercentage;
} else {
if ((unboundedRanks ? ranking.compareMessageRateTo(selectedRanking)
: ranking.compareTo(selectedRanking)) < 0) {
minLoadPercentage = loadPercentage;
selectedRU = candidate;
selectedRanking = ranking;
}
}
}
}
if ((minLoadPercentage > underloadThreshold && idleRU != null) || selectedRU == null) {
// assigned to idle broker is the least loaded broker already have optimum load (which means NOT
// underloaded), or all brokers are idle
selectedRU = idleRU;
} else if (minLoadPercentage >= 100.0 && randomRU != null && !unboundedRanks) {
// all brokers are full, assign to a random one
selectedRU = randomRU;
} else if (minLoadPercentage > overloadThreshold && !unboundedRanks) {
// assign to the broker with maximum available capacity if all brokers are overloaded
selectedRU = maxAvailableRU;
}
// re-calculate load level for selected broker
if (selectedRU != null) {
this.brokerRotationCursor = (this.brokerRotationCursor + 1) % 1000000;
ResourceUnitRanking ranking = resourceUnitRankings.get(selectedRU);
String loadPercentageDesc = ranking.getEstimatedLoadPercentageString();
log.info("Assign {} to {} with ({}).", serviceUnitId, selectedRU.getResourceId(), loadPercentageDesc);
if (!ranking.isServiceUnitPreAllocated(serviceUnitId)) {
ResourceQuota quota = this.getResourceQuota(serviceUnitId);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
}
return selectedRU;
}
private Multimap<Long, ResourceUnit> getFinalCandidates(ServiceUnitId serviceUnit,
Map<Long, Set<ResourceUnit>> availableBrokers) {
synchronized (brokerCandidateCache) {
final Multimap<Long, ResourceUnit> result = TreeMultimap.create();
availableBrokersCache.clear();
for (final Set<ResourceUnit> resourceUnits : availableBrokers.values()) {
for (final ResourceUnit resourceUnit : resourceUnits) {
availableBrokersCache.add(resourceUnit.getResourceId().replace("http://", ""));
}
}
brokerCandidateCache.clear();
try {
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache);
} catch (Exception e) {
log.warn("Error when trying to apply policies: {}", e);
for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
result.putAll(entry.getKey(), entry.getValue());
}
return result;
}
// After LoadManagerShared is finished applying the filter, put the results back into a multimap.
for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
final Long rank = entry.getKey();
final Set<ResourceUnit> resourceUnits = entry.getValue();
for (final ResourceUnit resourceUnit : resourceUnits) {
if (brokerCandidateCache.contains(resourceUnit.getResourceId().replace("http://", ""))) {
result.put(rank, resourceUnit);
}
}
}
return result;
}
}
public ResourceUnit getLeastLoaded(ServiceUnitId serviceUnit) throws Exception {
return getLeastLoadedBroker(serviceUnit, getAvailableBrokers(serviceUnit));
}
public Multimap<Long, ResourceUnit> getResourceAvailabilityFor(ServiceUnitId serviceUnitId) throws Exception {
return getFinalCandidates(serviceUnitId, getAvailableBrokers(serviceUnitId));
}
private Map<Long, Set<ResourceUnit>> getAvailableBrokers(ServiceUnitId serviceUnitId) throws Exception {
Map<Long, Set<ResourceUnit>> availableBrokers = sortedRankings.get();
// Normal case: we are the leader and we do have load reports information available
if (availableBrokers.isEmpty()) {
// Create a map with all available brokers with no load information
Set<String> activeBrokers = availableActiveBrokers.get(LOADBALANCE_BROKERS_ROOT);
List<String> brokersToShuffle = new ArrayList<>(activeBrokers);
Collections.shuffle(brokersToShuffle);
activeBrokers = new HashSet<>(brokersToShuffle);
availableBrokers = Maps.newTreeMap();
for (String broker : activeBrokers) {
ResourceUnit resourceUnit = new SimpleResourceUnit(String.format("http://%s", broker),
new PulsarResourceDescription());
availableBrokers.computeIfAbsent(0L, key -> Sets.newTreeSet()).add(resourceUnit);
}
log.info("Choosing at random from broker list: [{}]", availableBrokers.values());
}
return availableBrokers;
}
private ResourceUnit getLeastLoadedBroker(ServiceUnitId serviceUnit,
Map<Long, Set<ResourceUnit>> availableBrokers) {
ResourceUnit selectedBroker = null;
Multimap<Long, ResourceUnit> finalCandidates = getFinalCandidates(serviceUnit, availableBrokers);
// Remove candidates that point to inactive brokers
Set<String> activeBrokers = Collections.emptySet();
try {
activeBrokers = availableActiveBrokers.get();
// Need to use an explicit Iterator object to prevent concurrent modification exceptions
Iterator<Map.Entry<Long, ResourceUnit>> candidateIterator = finalCandidates.entries().iterator();
while (candidateIterator.hasNext()) {
Map.Entry<Long, ResourceUnit> candidate = candidateIterator.next();
String candidateBrokerName = candidate.getValue().getResourceId().replace("http://", "");
if (!activeBrokers.contains(candidateBrokerName)) {
candidateIterator.remove(); // Current candidate points to an inactive broker, so remove it
}
}
} catch (Exception e) {
log.warn("Error during attempt to remove inactive brokers while searching for least active broker", e);
}
if (finalCandidates.size() > 0) {
if (this.getLoadBalancerPlacementStrategy().equals(LOADBALANCER_STRATEGY_LLS)
|| this.getLoadBalancerPlacementStrategy().equals(LOADBALANCER_STRATEGY_LEAST_MSG)) {
selectedBroker = findBrokerForPlacement(finalCandidates, serviceUnit);
} else {
selectedBroker = placementStrategy.findBrokerForPlacement(finalCandidates);
}
log.info("Selected : [{}] for ServiceUnit : [{}]", selectedBroker.getResourceId(),
serviceUnit.getNamespaceObject().toString());
return selectedBroker;
} else {
// No available broker found
log.warn("No broker available to acquire service unit: [{}]", serviceUnit);
return null;
}
}
/*
* only update the report if a minute has elapsed since last update, since brokers update their report every minute.
*
* we should calculate the rank only for updated path but for now we read all the reports and re-calculate
* everything
*/
@Override
public void onUpdate(String path, LoadReport data, Stat stat) {
log.debug("Received updated load report from broker node - [{}], scheduling re-ranking of brokers.", path);
scheduler.submit(this::updateRanking);
}
private void updateRanking() {
try {
synchronized (currentLoadReports) {
currentLoadReports.clear();
Set<String> activeBrokers = availableActiveBrokers.get();
for (String broker : activeBrokers) {
try {
String key = String.format("%s/%s", LOADBALANCE_BROKERS_ROOT, broker);
LoadReport lr = loadReportCacheZk.get(key)
.orElseThrow(() -> new KeeperException.NoNodeException());
ResourceUnit ru = new SimpleResourceUnit(String.format("http://%s", lr.getName()),
fromLoadReport(lr));
this.currentLoadReports.put(ru, lr);
} catch (Exception e) {
log.warn("Error reading load report from Cache for broker - [{}], [{}]", broker, e);
}
}
updateRealtimeResourceQuota();
doLoadRanking();
}
} catch (Exception e) {
log.warn("Error reading active brokers list from zookeeper while re-ranking load reports [{}]", e);
}
}
public static boolean isAboveLoadLevel(SystemResourceUsage usage, float thresholdPercentage) {
return (usage.bandwidthOut.percentUsage() > thresholdPercentage
|| usage.bandwidthIn.percentUsage() > thresholdPercentage
|| usage.cpu.percentUsage() > thresholdPercentage || usage.memory.percentUsage() > thresholdPercentage);
}
public static boolean isBelowLoadLevel(SystemResourceUsage usage, float thresholdPercentage) {
return (usage.bandwidthOut.percentUsage() < thresholdPercentage
&& usage.bandwidthIn.percentUsage() < thresholdPercentage
&& usage.cpu.percentUsage() < thresholdPercentage && usage.memory.percentUsage() < thresholdPercentage);
}
private static long getRealtimeJvmHeapUsageMBytes() {
long totalHeapMemoryInBytes = Runtime.getRuntime().totalMemory();
long freeHeapMemoryInBytes = Runtime.getRuntime().freeMemory();
long memoryUsageInBytes = totalHeapMemoryInBytes - freeHeapMemoryInBytes;
long memoryUsageInMBytes = 0L;
if (memoryUsageInBytes > 0L) {
memoryUsageInMBytes = memoryUsageInBytes / MBytes;
}
return memoryUsageInMBytes;
}
private long getAverageJvmHeapUsageMBytes() {
if (this.avgJvmHeapUsageMBytes > 0) {
return this.avgJvmHeapUsageMBytes;
} else {
return getRealtimeJvmHeapUsageMBytes();
}
}
private SystemResourceUsage getSystemResourceUsage() throws IOException {
SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes();
systemResourceUsage.memory.usage = (double) memoryUsageInMBytes;
return systemResourceUsage;
}
@Override
public LoadReport generateLoadReport() throws Exception {
synchronized (bundleGainsCache) {
long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp();
if (timeSinceLastGenMillis <= LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL) {
return lastLoadReport;
}
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort()));
SystemResourceUsage systemResourceUsage = this.getSystemResourceUsage();
loadReport.setOverLoaded(isAboveLoadLevel(systemResourceUsage,
this.getLoadBalancerBrokerOverloadedThresholdPercentage()));
loadReport.setUnderLoaded(isBelowLoadLevel(systemResourceUsage,
this.getLoadBalancerBrokerUnderloadedThresholdPercentage()));
loadReport.setSystemResourceUsage(systemResourceUsage);
loadReport.setBundleStats(pulsar.getBrokerService().getBundleStats());
loadReport.setTimestamp(System.currentTimeMillis());
final Set<String> oldBundles = lastLoadReport.getBundles();
final Set<String> newBundles = loadReport.getBundles();
bundleGainsCache.clear();
bundleLossesCache.clear();
for (String oldBundle : oldBundles) {
if (!newBundles.contains(oldBundle)) {
bundleLossesCache.add(oldBundle);
}
}
for (String newBundle : newBundles) {
if (!oldBundles.contains(newBundle)) {
bundleGainsCache.add(newBundle);
}
}
loadReport.setBundleGains(bundleGainsCache);
loadReport.setBundleLosses(bundleLossesCache);
final ResourceQuota allocatedQuota = getTotalAllocatedQuota(newBundles);
loadReport.setAllocatedCPU(
(allocatedQuota.getMsgRateIn() + allocatedQuota.getMsgRateOut()) * realtimeCpuLoadFactor);
loadReport.setAllocatedMemory(allocatedQuota.getMemory());
loadReport.setAllocatedBandwidthIn(allocatedQuota.getBandwidthIn());
loadReport.setAllocatedBandwidthOut(allocatedQuota.getBandwidthOut());
loadReport.setAllocatedMsgRateIn(allocatedQuota.getMsgRateIn());
loadReport.setAllocatedMsgRateOut(allocatedQuota.getMsgRateOut());
final ResourceUnit resourceUnit = new SimpleResourceUnit(
String.format("http://%s", loadReport.getName()), fromLoadReport(loadReport));
Set<String> preAllocatedBundles;
if (resourceUnitRankings.containsKey(resourceUnit)) {
preAllocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
preAllocatedBundles.removeAll(newBundles);
} else {
preAllocatedBundles = new HashSet<>();
}
final ResourceQuota preAllocatedQuota = getTotalAllocatedQuota(preAllocatedBundles);
loadReport.setPreAllocatedCPU(
(preAllocatedQuota.getMsgRateIn() + preAllocatedQuota.getMsgRateOut()) * realtimeCpuLoadFactor);
loadReport.setPreAllocatedMemory(preAllocatedQuota.getMemory());
loadReport.setPreAllocatedBandwidthIn(preAllocatedQuota.getBandwidthIn());
loadReport.setPreAllocatedBandwidthOut(preAllocatedQuota.getBandwidthOut());
loadReport.setPreAllocatedMsgRateIn(preAllocatedQuota.getMsgRateIn());
loadReport.setPreAllocatedMsgRateOut(preAllocatedQuota.getMsgRateOut());
return loadReport;
} catch (Exception e) {
log.error("[{}] Failed to generate LoadReport for broker, reason [{}]", e.getMessage(), e);
throw e;
}
}
}
@Override
public void setLoadReportForceUpdateFlag() {
this.forceLoadReportUpdate = true;
}
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// update average JVM heap usage to average value of the last 120 seconds
long realtimeJvmHeapUsage = getRealtimeJvmHeapUsageMBytes();
if (this.avgJvmHeapUsageMBytes <= 0) {
this.avgJvmHeapUsageMBytes = realtimeJvmHeapUsage;
} else {
long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL);
this.avgJvmHeapUsageMBytes = ((weight - 1) * this.avgJvmHeapUsageMBytes + realtimeJvmHeapUsage) / weight;
}
// Update LoadReport in below situations:
// 1) This is the first time to update LoadReport
// 2) The last LoadReport is 5 minutes ago
// 3) There is more than 10% change on number of bundles assigned comparing with broker's maximum capacity
// 4) There is more than 10% change on resource usage comparing with broker's resource limit
boolean needUpdate = false;
if (lastLoadReport == null || this.forceLoadReportUpdate == true) {
needUpdate = true;
this.forceLoadReportUpdate = false;
} else {
long timestampNow = System.currentTimeMillis();
long timeElapsedSinceLastReport = timestampNow - lastLoadReport.getTimestamp();
int maxUpdateIntervalInMinutes = pulsar.getConfiguration().getLoadBalancerReportUpdateMaxIntervalMinutes();
if (timeElapsedSinceLastReport > TimeUnit.MINUTES.toMillis(maxUpdateIntervalInMinutes)) {
needUpdate = true;
} else if (timeElapsedSinceLastReport > LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL) {
// check number of bundles assigned, comparing with last LoadReport
long oldBundleCount = lastLoadReport.getNumBundles();
long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles();
long bundleCountChange = Math.abs(oldBundleCount - newBundleCount);
long maxCapacity = ResourceUnitRanking.calculateBrokerMaxCapacity(
lastLoadReport.getSystemResourceUsage(),
pulsar.getLocalZkCacheService().getResourceQuotaCache().getDefaultQuota());
double bundlePercentageChange = (maxCapacity > 0) ? (bundleCountChange * 100 / maxCapacity) : 0;
if (newBundleCount != oldBundleCount) {
needUpdate = true;
}
// check resource usage comparing with last LoadReport
if (!needUpdate && timestampNow - this.lastResourceUsageTimestamp > TimeUnit.MINUTES
.toMillis(pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes())) {
SystemResourceUsage oldUsage = lastLoadReport.getSystemResourceUsage();
SystemResourceUsage newUsage = this.getSystemResourceUsage();
this.lastResourceUsageTimestamp = timestampNow;
// calculate percentage of change
double cpuChange = (newUsage.cpu.limit > 0)
? ((newUsage.cpu.usage - oldUsage.cpu.usage) * 100 / newUsage.cpu.limit) : 0;
double memChange = (newUsage.memory.limit > 0)
? ((newUsage.memory.usage - oldUsage.memory.usage) * 100 / newUsage.memory.limit) : 0;
double directMemChange = (newUsage.directMemory.limit > 0)
? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100
/ newUsage.directMemory.limit)
: 0;
double bandwidthOutChange = (newUsage.bandwidthOut.limit > 0)
? ((newUsage.bandwidthOut.usage - oldUsage.bandwidthOut.usage) * 100
/ newUsage.bandwidthOut.limit)
: 0;
double bandwidthInChange = (newUsage.bandwidthIn.limit > 0)
? ((newUsage.bandwidthIn.usage - oldUsage.bandwidthIn.usage) * 100
/ newUsage.bandwidthIn.limit)
: 0;
long resourceChange = (long) Math.min(100.0,
Math.max(Math.abs(cpuChange),
Math.max(Math.abs(directMemChange), Math.max(Math.abs(memChange),
Math.max(Math.abs(bandwidthOutChange), Math.abs(bandwidthInChange))))));
if (resourceChange > pulsar.getConfiguration().getLoadBalancerReportUpdateThresholdPercentage()) {
needUpdate = true;
log.info("LoadReport update triggered by change on resource usage, detal ({}).",
String.format(
"cpu: %.1f%%, mem: %.1f%%, directMemory: %.1f%%, bandwidthIn: %.1f%%, bandwidthOut: %.1f%%)",
cpuChange, memChange, directMemChange, bandwidthInChange, bandwidthOutChange));
}
}
}
}
if (needUpdate) {
LoadReport lr = generateLoadReport();
pulsar.getZkClient().setData(brokerZnodePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr),
-1);
this.lastLoadReport = lr;
this.lastResourceUsageTimestamp = lr.getTimestamp();
// split-bundle if requires
doNamespaceBundleSplit();
}
}
// todo: changeme: this can be optimized, we don't have to iterate through everytime
private boolean isBrokerAvailableForRebalancing(String bundleName, long maxLoadLevel) {
NamespaceName namespaceName = new NamespaceName(LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
Map<Long, Set<ResourceUnit>> availableBrokers = sortedRankings.get();
// this does not have "http://" in front, hacky but no time to pretty up
Multimap<Long, ResourceUnit> brokers = getFinalCandidates(namespaceName, availableBrokers);
for (Object broker : brokers.values()) {
ResourceUnit underloadedRU = (ResourceUnit) broker;
LoadReport currentLoadReport = currentLoadReports.get(underloadedRU);
if (isBelowLoadLevel(currentLoadReport.getSystemResourceUsage(), maxLoadLevel)) {
return true;
}
}
return false;
}
/**
* If load balancing is enabled, load shedding is enabled by default unless forced off by setting a flag in global
* zk /admin/flags/load-shedding-unload-disabled
*
* @return false by default, unload is allowed in load shedding true if zk flag is set, unload is disabled
*/
public boolean isUnloadDisabledInLoadShedding() {
if (!pulsar.getConfiguration().isLoadBalancerEnabled()) {
return true;
}
boolean unloadDisabledInLoadShedding = false;
try {
unloadDisabledInLoadShedding = pulsar.getGlobalZkCache()
.exists(AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH);
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper",
AdminResource.LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH, e);
}
return unloadDisabledInLoadShedding;
}
private void unloadNamespacesFromOverLoadedBrokers(Map<ResourceUnit, String> namespaceBundlesToUnload) {
for (Map.Entry<ResourceUnit, String> bundle : namespaceBundlesToUnload.entrySet()) {
String brokerName = bundle.getKey().getResourceId();
String bundleName = bundle.getValue();
try {
if (unloadedHotNamespaceCache.getIfPresent(bundleName) == null) {
if (!isUnloadDisabledInLoadShedding()) {
log.info("Unloading namespace {} from overloaded broker {}", bundleName, brokerName);
adminCache.get(brokerName).namespaces().unloadNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName));
log.info("Successfully unloaded namespace {} from broker {}", bundleName, brokerName);
} else {
log.info("DRY RUN: Unload in Load Shedding is disabled. Namespace {} would have been "
+ "unloaded from overloaded broker {} otherwise.", bundleName, brokerName);
}
unloadedHotNamespaceCache.put(bundleName, System.currentTimeMillis());
} else {
// we can't unload this namespace so move to next one
log.info("Can't unload Namespace {} because it was unloaded last at {} and unload interval has "
+ "not exceeded.", bundleName, LocalDateTime.now());
}
} catch (Exception e) {
log.warn("ERROR failed to unload the bundle {} from overloaded broker {}", bundleName, brokerName, e);
}
}
}
@Override
public void doLoadShedding() {
long overloadThreshold = this.getLoadBalancerBrokerOverloadedThresholdPercentage();
long comfortLoadLevel = this.getLoadBalancerBrokerComfortLoadThresholdPercentage();
log.info("Running load shedding task as leader broker, overload threshold {}, comfort loadlevel {}",
overloadThreshold, comfortLoadLevel);
// overloadedRU --> bundleName
Map<ResourceUnit, String> namespaceBundlesToBeUnloaded = new HashMap<>();
synchronized (currentLoadReports) {
for (Map.Entry<ResourceUnit, LoadReport> entry : currentLoadReports.entrySet()) {
ResourceUnit overloadedRU = entry.getKey();
LoadReport lr = entry.getValue();
if (isAboveLoadLevel(lr.getSystemResourceUsage(), overloadThreshold)) {
ResourceType bottleneckResourceType = lr.getBottleneckResourceType();
Map<String, NamespaceBundleStats> bundleStats = lr.getSortedBundleStats(bottleneckResourceType);
// 1. owns only one namespace
if (bundleStats.size() == 1) {
// can't unload one namespace, just issue a warning message
String bundleName = lr.getBundleStats().keySet().iterator().next();
log.warn(
"HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. "
+ "No Load Shedding will be done on this broker",
bundleName, overloadedRU.getResourceId());
continue;
}
for (Map.Entry<String, NamespaceBundleStats> bundleStat : bundleStats.entrySet()) {
String bundleName = bundleStat.getKey();
NamespaceBundleStats stats = bundleStat.getValue();
// We need at least one underloaded RU from list of candidates that can host this bundle
if (isBrokerAvailableForRebalancing(bundleStat.getKey(), comfortLoadLevel)) {
log.info(
"Namespace bundle {} will be unloaded from overloaded broker {}, bundle stats (topics: {}, producers {}, "
+ "consumers {}, bandwidthIn {}, bandwidthOut {})",
bundleName, overloadedRU.getResourceId(), stats.topics, stats.producerCount,
stats.consumerCount, stats.msgThroughputIn, stats.msgThroughputOut);
namespaceBundlesToBeUnloaded.put(overloadedRU, bundleName);
} else {
log.info("Unable to shed load from broker {}, no brokers with enough capacity available "
+ "for re-balancing {}", overloadedRU.getResourceId(), bundleName);
}
break;
}
}
}
}
unloadNamespacesFromOverLoadedBrokers(namespaceBundlesToBeUnloaded);
}
/**
* Detect and split hot namespace bundles
*/
@Override
public void doNamespaceBundleSplit() throws Exception {
int maxBundleCount = pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxTopics();
long maxBundleSessions = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxSessions();
long maxBundleMsgRate = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBundleBandwidth = pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
log.info(
"Running namespace bundle split with thresholds: topics {}, sessions {}, msgRate {}, bandwidth {}, maxBundles {}",
maxBundleTopics, maxBundleSessions, maxBundleMsgRate, maxBundleBandwidth, maxBundleCount);
if (this.lastLoadReport == null || this.lastLoadReport.getBundleStats() == null) {
return;
}
Map<String, NamespaceBundleStats> bundleStats = this.lastLoadReport.getBundleStats();
Set<String> bundlesToBeSplit = new HashSet<>();
for (Map.Entry<String, NamespaceBundleStats> statsEntry : bundleStats.entrySet()) {
String bundleName = statsEntry.getKey();
NamespaceBundleStats stats = statsEntry.getValue();
long totalSessions = stats.consumerCount + stats.producerCount;
double totalMsgRate = stats.msgRateIn + stats.msgRateOut;
double totalBandwidth = stats.msgThroughputIn + stats.msgThroughputOut;
boolean needSplit = false;
if (stats.topics > maxBundleTopics || totalSessions > maxBundleSessions || totalMsgRate > maxBundleMsgRate
|| totalBandwidth > maxBundleBandwidth) {
if (stats.topics <= 1) {
log.info("Unable to split hot namespace bundle {} since there is only one topic.", bundleName);
} else {
NamespaceName namespaceName = new NamespaceName(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName));
int numBundles = pulsar.getNamespaceService().getBundleCount(namespaceName);
if (numBundles >= maxBundleCount) {
log.info("Unable to split hot namespace bundle {} since the namespace has too many bundles.",
bundleName);
} else {
needSplit = true;
}
}
}
if (needSplit) {
if (this.getLoadBalancerAutoBundleSplitEnabled()) {
log.info(
"Will split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
bundlesToBeSplit.add(bundleName);
} else {
log.info(
"DRY RUN - split hot namespace bundle {}, topics {}, producers+consumers {}, msgRate in+out {}, bandwidth in+out {}",
bundleName, stats.topics, totalSessions, totalMsgRate, totalBandwidth);
}
}
}
if (bundlesToBeSplit.size() > 0) {
for (String bundleName : bundlesToBeSplit) {
try {
pulsar.getAdminClient().namespaces().splitNamespaceBundle(
LoadManagerShared.getNamespaceNameFromBundleName(bundleName),
LoadManagerShared.getBundleRangeFromBundleName(bundleName));
log.info("Successfully split namespace bundle {}", bundleName);
} catch (Exception e) {
log.error("Failed to split namespace bundle {}", bundleName, e);
}
}
this.setLoadReportForceUpdateFlag();
}
}
@Override
public void stop() throws PulsarServerException {
// do nothing
}
}