The modular load manager, implemented in ModularLoadManagerImpl
, is a flexible alternative to the previously implemented load manager, which attempts to simplify how load is managed while also providing abstractions so that complex load management strategies may be implemented.
The load management component determines the criteria for unloading bundles and contains the following load shedding strategy: OverloadShedder
and ThresholdShedder
and UniformLoadShedder
. (default is ThresholdShedder
since 2.10.0)
OverloadShedder
: This strategy attempts to shed exactly one bundle on brokers which are overloadedThresholdShedder
: This strategy unloads any broker that exceeds the average resource utilization of all brokers by a configured threshold.UniformLoadShedder
:This strategy tends to distribute load uniformly across all brokers.However, the bundle placement strategy contains only one: LeastLongTermMessageRate
, which selects a broker based on which one has the least long term message rate.
The load management in our pulsar cluster use ThresholdShedder
as load shedding strategy, and use LeastLongTermMessageRate
as bundle placement strategy, which does not work well. Some broker nodes have a high load when the traffic of some topics are relatively large. The load shedding strategy will unload some bundles in any broker that exceeds the average resource utilization of all brokers by a configured threshold. And the bundles will be transferred to the next broker node. However it causes the load of the next broker node exceed the average resource utilization. Therefore, the load balancing will occur again on the current broker node due to high load. Worse yet, this scenario keeps popping up.
The load shedding strategy configuration is as follows
# load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedder loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder # The broker resource usage threshold. # When the broker resource usage is greater than the pulsar cluster average resource usage, # the threshold shedder will be triggered to offload bundles from the broker. # It only takes effect in the ThresholdShedder strategy. loadBalancerBrokerThresholdShedderPercentage=10 # When calculating new resource usage, the history usage accounts for. # It only takes effect in the ThresholdShedder strategy. loadBalancerHistoryResourcePercentage=0.9 # The BandWithIn usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerBandwithInResourceWeight=1.0 # The BandWithOut usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerBandwithOutResourceWeight=1.0 # The CPU usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerCPUResourceWeight=1.0 # The heap memory usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerMemoryResourceWeight=0.1 # The direct memory usage weight when calculating new resource usage. # It only takes effect in the ThresholdShedder strategy. loadBalancerDirectMemoryResourceWeight=0.1 # Bundle unload minimum throughput threshold (MB), avoiding bundle unload frequently. # It only takes effect in the ThresholdShedder strategy. loadBalancerBundleUnloadMinThroughputThreshold=0.1
The following screenshots are the status of the cluster: Problem 1. Load balancing took a long time 10 hours and over 400 times, and it has been unloading if there is a large traffic.
Problem 2. The effect of cpu balancing is poor.
The load shedding strategy ThresholdShedder
work well, but not the bundle placement strategyLeastLongTermMessageRate . There are 3 possible reasons for the problems.
Some brokers with lower load but more bundles can not be candidate due to distributing bundles evenly in LoadManager by force. Most of brokers are filtered out by the strategy, only 1 or 2 brokers can be candidate in the total 136 brokers . It was fixed by #16059
The memory usage of Java programs fluctuates widely, so that the maximum resource usage calculated is based on memory usage most of the time, which filters out brokers with low CPU load. Below is the sample of two brokers jvm memory usage in the cluster. If the broker is overload, it will get highest score, which prevents it from being a candidate.
The bundle placement strategy is LeastLongTermMessageRate
, which selects a broker based on which one has the least long term message rate instead of load metric. The LeastLongTermMessageRate
does not get along with ThresholdShedder
well. Therefore, a load-based bundle placement strategy is necessary to cooperate with ThresholdShedder
.
The ThresholdShedder
strategy that unloads any broker that exceeds the average resource utilization of all brokers by a configured threshold. As a consequence, this strategy tends to distribute load among all brokers. It does this by first computing the average resource usage per broker for the whole cluster. The resource usage for each broker is calculated using the following method: LocalBrokerData#getMaxResourceUsageWithWeight
). The weights for each resource are configurable. Historical observations are included in the running average based on the broker‘s setting for loadBalancerHistoryResourcePercentage. Once the average resource usage is calculated, a broker’s current/historical usage is compared to the average broker usage. If a broker's usage is greater than the average usage per broker plus the loadBalancerBrokerThresholdShedderPercentage, this load shedder proposes removing enough bundles to bring the unloaded broker 5% below the current average broker usage. Note that recently unloaded bundles are not unloaded again.
Develop a new load-based bundle placement strategy for better load balancing with fewer times, and less time, which cab achieve better teamwork with ThresholdShedder
.
No user-facing API changes are required.
The main idea of the new strategy is to unify the requirement of load shedding strategy and bundle placement strategy, which consider the resource usage with weight, including historical observations.
How to calculate a score for a broker ?
How to select a broker for assignning bundle ?
minScore
among low load brokers, and use loadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage
to adjust the size of the randomization pool
The existing cache implementation will not be removed at this point. Users will be able to configure the old implementation in broker.conf
. This option will be helpful in case of performance regressions would be seen for some use cases with the new strategy implementation.
# load assignment strategy, support LeastLongTermMessageRate and LeastResourceUsageWithWeight, default is LeastLongTermMessageRate loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight # The broker average resource usage difference threshold. # Average resource usage difference threshold to determine a broker whether to be a best candidate in LeastResourceUsageWithWeight. # (eg: broker1 with 10% resource usage with weight and broker2 with 30% and broker3 with 80% will have 40% average resource usage. # The placement strategy can select broker1 and broker2 as best candidates.) # It only takes effect in the LeastResourceUsageWithWeight strategy. loadBalancerAverageResourceUsageDifferenceThresholdShedderPercentage=10
Below are screenshots of the effect of the new strategy with less time and fewer load balancing times.
None yet.