| |
| # PIP-364: Introduce a new load balance algorithm AvgShedder |
| |
| # Background knowledge |
| |
| Pulsar has two load balance interfaces: |
| - `LoadSheddingStrategy` is an unloading strategy that identifies high load brokers and unloads some of the bundles they carry to reduce the load. |
| - `ModularLoadManagerStrategy` is a placement strategy responsible for assigning bundles to brokers. |
| |
| ## LoadSheddingStrategy |
| There are three available algorithms: `ThresholdShedder`, `OverloadShedder`, `UniformLoadShedder`. |
| |
| ### ThresholdShedder |
| `ThresholdShedder` uses the following method to calculate the maximum resource utilization rate for each broker, |
| which includes CPU, direct memory, bandwidth in, and bandwidth out. |
| ``` |
| public double getMaxResourceUsageWithWeight(final double cpuWeight, |
| final double directMemoryWeight, final double bandwidthInWeight, |
| final double bandwidthOutWeight) { |
| return max(cpu.percentUsage() * cpuWeight, |
| directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight, |
| bandwidthOut.percentUsage() * bandwidthOutWeight) / 100; |
| } |
| ``` |
| |
| After calculating the maximum resource utilization rate for each broker, a historical weight algorithm will |
| also be executed to obtain the final score. |
| ``` |
| historyUsage = historyUsage == null ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage; |
| ``` |
| The historyPercentage is determined by configuring the `loadBalancerHistoryResourcePercentage`. |
| The default value is 0.9, which means that the last calculated score accounts for 90%, |
| while the current calculated score only accounts for 10%. |
| |
| The introduction of this historical weight algorithm is to avoid bundle switching caused by |
| short-term abnormal load increase or decrease, but in fact, this algorithm will introduce some |
| serious problems, which will be explained in detail later. |
| |
| Next, calculate the average score of all brokers in the entire cluster: `avgUsage=totalUsage/totalBrokers`. |
| When the score of any broker exceeds a certain threshold of avgUsage, it is determined that the broker is overloaded. |
| The threshold is determined by the configuration `loadBalancerBrokerThresholdShedderPercentage`, with a default value of 10. |
| |
| |
| ### OverloadShedder |
| `OverloadShedder` use the same method `getMaxResourceUsageWithWeight` to calculate the maximum resource utilization rate for each broker. |
| The difference is that `OverloadShedder` will not use the historical weight algorithm to calculate the final score, |
| the final score is the current maximum resource utilization rate of the broker. |
| |
| After obtaining the load score for each broker, compare it with the `loadBalancerBrokerOverloadedThresholdPercentage`. |
| If the threshold is exceeded, it is considered overloaded, with a default value of 85%. |
| |
| This algorithm is relatively simple, but there are many serious corner cases, so it is not recommended to use `OverloadShedder`. |
| Here are two cases: |
| - When the load on each broker in the cluster reaches the threshold, the bundle unload will continue to be executed, |
| but it will only switch from one overloaded broker to another, which is meaningless. |
| - If there are no broker whose load reaches the threshold, adding new brokers will not balance the traffic to the new added brokers. |
| The impact of these two points is quite serious, so we won't talk about it next. |
| |
| |
| ### UniformLoadShedder |
| `UniformLoadShedder` will first calculate the maximum and minimum message rates, as well as the maximum and minimum |
| traffic throughput and corresponding broker. Then calculate the maximum and minimum difference, with two thresholds |
| corresponding to message rate and throughput size, respectively. |
| |
| - loadBalancerMsgRateDifferenceShedderThreshold |
| |
| The message rate percentage threshold between the highest and lowest loaded brokers, with a default value of 50, |
| can trigger bundle unload when the maximum message rate is 1.5 times the minimum message rate. |
| For example, broker 1 with 50K msgRate and broker 2 with 30K msgRate will have a (50-30)/30=66%>50% difference in msgRate, |
| and the load balancer can unload the bundle from broker 1 to broker 2. |
| |
| - loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold |
| |
| The threshold for the message throughput multiplier between the highest and lowest loaded brokers, |
| with a default value of 4, can trigger bundle unload when the maximum throughput is 4 times the minimum throughput. |
| For example, if the msgRate of broker 1 is 450MB, broker 2 is 100MB, and the difference in msgThrough |
| is 450/100=4.5>4 times, then the load balancer can unload the bundle from broker 1 to broker 2. |
| |
| |
| After introducing the algorithm of `UniformLoadShedder`, we can clearly obtain the following information: |
| #### load jitter |
| `UniformLoadShedder` does not have the logic to handle load jitter. For example, |
| when the traffic suddenly increases or decreases. This load data point is adopted, triggering a bundle unload. |
| However, the traffic of this topic will soon return to normal, so it is very likely to trigger a bundle unload again. |
| This type of bundle unload should be avoided. This kind of scenario is very common, actually. |
| |
| #### heterogeneous environment |
| `UniformLoadShedder` does not rely on indicators such as CPU usage and network card usage to determine high load |
| and low load brokers, but rather determines them based on message rate and traffic throughput size, |
| while `ThresholdShedder` and `OverloadShedder` rely on machine resource indicators such as CPU usage to determine. |
| If the cluster is heterogeneous, such as different machines with different hardware configurations, |
| or if there are other processes sharing resources on the machine where the broker is located, |
| `UniformLoadShedder` is likely to misjudge high and low load brokers, thereby migrating the load from high-performance |
| but low load brokers to low-performance but high load brokers. |
| Therefore, it is not recommended for users to use `UniformLoadShedder` in heterogeneous environments. |
| |
| #### slow load balancing |
| `UniformLoadShedder` will only unload the bundle from one of the highest loaded brokers at a time, |
| which may take a considerable amount of time for a large cluster to complete all load balancing tasks. |
| For example, if there are 100 high load brokers in the current cluster and 100 new machines to be added, |
| it is roughly estimated that it will take 100 shedding to complete the balancing. |
| However, since the execution time interval of the `LoadSheddingStrategy` policy is determined by the |
| configuration of `loadBalancerSheddingIntervalMinutes`, which defaults to once every 1 minute, |
| so it will take 100 minutes to complete all tasks. For users using large partition topics, their tasks |
| are likely to be disconnected multiple times within this 100 minutes, which greatly affects the user experience. |
| |
| |
| ## ModularLoadManagerStrategy |
| The `LoadSheddingStrategy` strategy is used to unload bundles of high load brokers. However, in order to |
| achieve a good load balancing effect, it is necessary not only to "unload" correctly, but also to "load" correctly. |
| The `ModularLoadManagerStrategy` strategy is responsible for assigning bundles to brokers. |
| The coordination between `LoadSheddingStrategy` and `ModularLoadManagerStrategy` is also a key point worth paying attention to. |
| |
| ### LeastLongTermMessageRate |
| The `LeastLongTermMessageRate` algorithm directly used the maximum resource usage of CPU and so on as the broker's score, |
| and reused the `OverloadShedder` configuration, `loadBalancerBrokerOverloadedThresholdPercentage`. |
| If the score is greater than it (default 85%), set `score=INF`; Otherwise, update the broker's score to the sum of the |
| message in and out rates obtained from the broker's long-term aggregation. |
| ``` |
| score = longTerm MsgIn rate+longTerm MsgOut rate, |
| ``` |
| Finally, randomly select a broker from the broker with the lowest score to return. If the score of each broker is INF, |
| randomly select broker from all brokers. |
| |
| The scoring algorithm in `LeastLongTermMessageRate` is essentially based on message rate. Although it initially examines |
| the maximum resource utilization, it is to exclude overloaded brokers only. |
| Therefore, in most cases, brokers are sorted based on the size of the message rate as a score, which results in the same |
| issues with heterogeneous environments, similar to `UniformLoadShedder`. |
| |
| |
| #### Effect of the combination of `LoadSheddingStrategy` and `LeastLongTermMessageRate` |
| Next, we will attempt to analyze the effect together with the `LoadSheddingStrategy`. |
| - **LeastLongTermMessageRate + OverloadShedder** |
| This is the initial combination, but due to some inherent flaws in `OverloadShedder`, **it is not recommended**. |
| |
| - **LeastLongTermMessageRate + ThresholdShedder** |
| This combination is even worse than `LeastLongTermMessageRate + OverloadShedder` and **is not recommended**. |
| Because `OverloadShedder` uses the maximum weighted resource usage and historical score to score brokers, |
| while LeastLongTermMessage Rate is scored based on message rate. Inconsistent unloading and placement criteria |
| can lead to incorrect load balancing execution. |
| This is also why a new placement strategy `LeastResourceUsageWithWeight` will be introduced later. |
| |
| - **LeastLongTermMessageRate + UniformLoadShedder** |
| This is **recommended**. Both uninstallation and placement policy are based on message rate, |
| but using message rate as a standard naturally leads to issues with heterogeneous environments. |
| |
| |
| ### LeastResourceUsageWithWeight |
| `LeastResourceUsageWithWeight` uses the same scoring algorithm as `ThresholdShedder` to score brokers, which uses |
| weighted maximum resource usage and historical scores to calculate the current score. |
| |
| Next, select candidate brokers based on the configuration of `loadBalancerAverageResourceUsageDifferenceThresholdPercentage`. |
| If a broker's score plus this threshold is still not greater than the average score, the broker will be added to the |
| candidate broker list. After obtaining the candidate broker list, a broker will be randomly selected from it; |
| If there are no candidate brokers, randomly select from all brokers. |
| |
| For example, if the resource utilization rate of broker 1 is 10%, broker 2 is 30%, and broker 3 is 80%, |
| the average resource utilization rate is 40%. The placement strategy can choose Broker1 and Broker2 |
| as the best candidates, as the thresholds are 10, 10+10<=40, 30+10<=40. In this way, the bundles uninstalled |
| from broker 3 will be evenly distributed among broker 1 and broker 2, rather than being completely placed on broker 1. |
| |
| #### over placement problem |
| Over placement problem is that the bundle is placed on high load brokers and make them overloaded. |
| |
| In practice, it will be found that it is difficult to determine a suitable value for `loadBalancerAverageResourceUsageDifferenceThresholdPercentage`, |
| which often triggers a fallback global random selection logic. For example, if there are 6 brokers in the current |
| cluster, with scores of 40, 40, 40, 40, 69, and 70 respectively, the average score is 49.83. |
| Using the default configuration, there are no candidate brokers because 40+10>49.83. |
| Triggering a bottom-up global random selection logic and the bundle may be offloaded from the overloaded broker5 |
| to the overloaded broker6, or vice versa, **causing the over placement problem.** |
| |
| Attempting to reduce the configuration value to expand the random pool, such as setting it to 0, may also include some |
| overloaded brokers in the candidate broker list. For example, if there are 5 brokers in the current cluster with scores |
| of 10, 60, 70, 80, and 80 respectively, the average score is 60. As the configuration value is 0, then broker 1 and |
| broker 2 are both candidate brokers. If broker 2 shares half of the offloaded traffic, **it is highly likely to overload.** |
| |
| Therefore, it is difficult to configure the `LeastResourceUsageWithWeight` algorithm well to avoid incorrect load balancing. |
| Of course, if you want to use the `ThresholdShedder` algorithm, the combination of `ThresholdShedder+LeastResourceUsageWithWeight` |
| will still be superior to the combination of `ThresholdShedder+LeastLongTermMessageRate`, because at least the scoring algorithm |
| of `LeastResourceUsageWithWeight` is consistent with that of `ThresholdShedder`. |
| |
| #### why doesn't LeastLongTermMessage Rate have over placement problem? |
| The root of over placement problem is that the frequency of updating the load data is limited due to the performance |
| of zookeeper. If we assign a bundle to a broker, the broker's load will increase after a while, and it's load data |
| also need some time to be updated to leader broker. If there are many bundles unloaded in a shedding, |
| how can we assign these bundles to brokers? |
| |
| The most simple way is to assign them to the broker with the lowest load, but it may cause the over placement problem |
| as it is most likely that there is only one single broker with the lowest load. With all bundles assigned to this broker, |
| it will be overloaded. This is the reason why `LeastResourceUsageWithWeight` try to determine a candidate broker list |
| to avoid the over placement problem. But we also find that candidate broker list can be empty or include some overloaded |
| brokers, which will also cause the over placement problem. |
| |
| So why doesn't `LeastLongTermMessageRate` have over placement problem? The reason is that each time a bundle is assigned, |
| the bundle will be added into `PreallocatedBundleData`. When scoring a broker, not only will the long-term message rate |
| aggregated by the broker itself be used, but also the message rate of bundles in `PreallocatedBundleData` that have been |
| assigned to the broker but have not yet been reflected in the broker's load data will be calculated. |
| |
| For example, if there are two bundles with 20KB/s message rate to be assigned, and broker1 and broker2 at 100KB/s |
| and 110KB/s respectively. The first bundle is assigned to broker1, However, broker1's load data will not be updated |
| in the short term. Before the load data is updated, `LeastLongTermMessageRate` try to assign the second bundle. |
| At this time, the score of broker1 is 100+20=120KB/s, where 20KB/s is the message rate of the first bundle |
| from `PreallocatedBundleData`. As broker1's score is greater than broker2, the second bundle will be assigned to broker2. |
| |
| **`LeastLongTermMessageRate` predict the load of the broker after the bundle is assigned to avoid the over placement problem.** |
| |
| **Why doesn't `LeastResourceUsageWithWeight` have this feature? Because it is not possible to predict how much resource |
| utilization a broker will increase when loading a bundle. All algorithms scoring brokers based on resource utilization |
| can't fix the over placement problem with this feature.** |
| So `LeastResourceUsageWithWeight` try to determine a candidate broker list to avoid the over placement problem, which is |
| proved to be not a good solution. |
| |
| |
| #### over unloading problem |
| Over unloading problem is that the load offloaded from high load brokers is too much and make them underloaded. |
| |
| Finally, let's talk about the issue of historical weighted scoring algorithms. The historical weighted scoring algorithm |
| is used by the `ThresholdShedder` and `LeastResourceUsageWithWeight` algorithms, as follows: |
| ``` |
| HistoryUsage=historyUsage=null? ResourceUsage: historyUsage * historyPercentage+(1- historyPercentage) * resourceUsage; |
| ``` |
| The default value of historyPercentage is 0.9, indicating that the score calculated last time has a significant impact on the current score. |
| The current maximum resource utilization only accounts for 10%, which is to solves the problem of load jitter. |
| However, introducing this algorithm has its side effects, such as over unloading problem. |
| |
| For example, there is currently one broker1 in the cluster with a load of 90%, and broker2 is added with a current load of 10%. |
| - At the first execution of shedding: broker1 scores 90, broker2 scores 10. For simplicity, assuming that the algorithm will |
| move some bundles to make their load the same, thus the true load of broker 1 and broker 2 become 50 after load shedding is completed. |
| - At the second execution of shedding: broker1 scores 90*0.9+50*0.1=86, broker2 scores 10*0.9+50*0.1=14. |
| **Note that the actual load of broker1 here is 50, but it is overestimated as 86!** |
| **The true load of broker2 is also 50, but it is underestimated at 14!** |
| Due to the significant difference in ratings between the two, although their actual loads are already the same, |
| broker1 will continue to unload traffic corresponding to 36 points from broker1 to broker2, |
| resulting in broker1's actual load score becoming 14, broker2's actual load score becoming 86. |
| |
| - At the third execution of shedding: broker1 scored 86*0.9+14*0.1=78.8, broker2 scored 14*0.9+86*0.1=21.2. |
| It is ridiculous that broker1 is still considered overloaded, and broker2 is still considered underloaded. |
| All loads in broker1 are moved to broker2, which is the over unloading problem. |
| |
| Although this example is an idealized theoretical analysis, we can still see that using historical scoring algorithms |
| can seriously overestimate or underestimate the true load of the broker. Although it can avoid the problem of load jitter, |
| it will introduce a more serious and broader problem: **overestimating or underestimating the true load of the broker, |
| leading to incorrect load balancing execution**. |
| |
| |
| ## Summary |
| Based on the previous analysis, although we have three shedding strategies and two placement strategies |
| that can generate 6 combinations of 3 * 2, we actually only have two recommended options: |
| - ThresholdShedder + LeastResourceUsageWithWeight |
| - UniformLoadShedder + LeastLongTermMessageRate |
| |
| These two options each have their own advantages and disadvantages, and users can choose one according to |
| their requirements. The following table summarizes the advantages and disadvantages of the two options: |
| |
| | Combination | heterogeneous environment | load jitter | over placement problem | over unloading problem | slow load balancing | |
| |---------------------------------------------|---------------------------|------------|-----------------------|-----------------------|---------------------| |
| | ThresholdShedder + LeastResourceUsageWithWeight | normal(1) | good | bad | bad | normal(1) | |
| | UniformLoadShedder + LeastLongTermMessageRate | bad(2) | bad | good | good | normal(1) | |
| |
| 1. In terms of adapting to heterogeneous environments, `ThresholdShedder+LeastResourceUsageWithWeight` can |
| only be rated as `normal`. This is because `ThresholdShedder` is not fully adaptable to heterogeneous environments. |
| Although it does not misjudge overloaded brokers as underloaded, heterogeneous environments can still have a |
| significant impact on the load balancing effect of `ThresholdShedder`. |
| For example, there are three brokers in the current cluster with resource utilization rates of 10, 50, and 70, respectively. |
| Broker1 and Broker2 are isomorphic. Though Broker3 don't bear any load, its resource utilization rate has |
| reached to 70 due to the deployment of other processes at the same machine. |
| At this point, we would like broker 1 to share some of the pressure from broker2, but since the average load is |
| 43.33, 43.33+10>50, broker2 will not be judged as overloaded, and overloaded broker 3 also has no traffic to |
| unload, causing the load balancing algorithm to be in an inoperable state. |
| |
| 2. In the same scenario, if `UniformLoadShedder+LeastLongTermMessageRate` is used, the problem will be more |
| severe, as some of the load will be offloaded from broker2 to broker3. As a result, the performance of those |
| topics in broker3 services will experience significant performance degradation. |
| Therefore, it is not recommended to run Pulsar in heterogeneous environments as current load balancing algorithms |
| cannot adapt too well. If it is unavoidable, it is recommended to choose `ThresholdShedder+LeastResourceUsageWithWeight`. |
| |
| 3. In terms of load balancing speed, although `ThresholdShedder+LeastResourceUsageWithWeight` can unload the load |
| of all overloaded brokers at once, historical scoring algorithms can seriously affect the accuracy of load |
| balancing decisions. Therefore, in reality, it also requires multiple load balancing executions to finally |
| stabilize. This is why the load balancing speed of `ThresholdShedder+LeastResourceUsageWithWeight` is rated as `normal`. |
| |
| 4. In terms of load balancing speed, `UniformLoadShedder+LeastLongTermMessageRate` can only unload the load of one |
| overloaded broker at a time, so it takes a long time to complete load balancing when there are many brokers, |
| so it is also rated as `normal`. |
| |
| |
| # Motivation |
| |
| The current load balance algorithm has some serious problems, such as load jitter, heterogeneous environment, slow load balancing, etc. |
| This PIP aims to introduce a new load balance algorithm `AvgShedder` to solve these problems. |
| |
| # Goals |
| |
| Introduce a new load balance algorithm `AvgShedder` that can solve the problems of load jitter, heterogeneous environment, slow load balancing, etc. |
| |
| |
| # High Level Design |
| |
| ## scoring criterion |
| First of all, to determine high load brokers, it is necessary to rate and sort them. |
| Currently, there are two scoring criteria: |
| - Resource utilization rate of broker |
| - The message rate and throughput of the broker |
| Based on the previous analysis, it can be seen that scoring based on message rate and throughput will face |
| the same problem as `UniformLoadShedder` in heterogeneous environments, while scoring based on resource utilization |
| rate will face the over placement problem like `LeastResourceUsageWithWeight`. |
| |
| **To solve the problem of heterogeneous environments, we use the resource utilization rate of the broker as the scoring criterion.** |
| |
| |
| ## binding shedding and placement strategies |
| So how can we avoid the over placement problem? **The key is to bind the shedding and placement strategies together.** |
| If every bundle unloaded from the high load broker is assigned to the right low load broker in shedding strategy, |
| the over placement problem will be solved. |
| |
| For example, if the broker rating of the current cluster is 20,30,52,80,80, and the shedding and placement strategies are decoupled, |
| the bundles will be unloaded from the two brokers with score of 80, and then all these bundles will be placed on the broker with a |
| score of 20, causing the over placement problem. |
| |
| If the shedding and placement strategies are coupled, one broker with 80 score can unload some bundles to a broker with 20 score, |
| and another broker with 80 score can unload the bundle to the broker with 30 score. In this way, we can avoid the over placement problem. |
| |
| |
| ## evenly distributed traffic between the highest and lowest loaded brokers |
| We will first pick out the highest and lowest loaded brokers, and then evenly distribute the traffic between them. |
| |
| For example, if the broker rating of the current cluster is 20,30,52,70,80, and the message rate of the highest loaded broker is 1000, |
| the message rate of the lowest loaded broker is 500. We introduce a threshold to whether trigger the bundle unload, for example, |
| the threshold is 40. As the difference between the score of the highest and lowest loaded brokers is 100-50=50>40, |
| the shedding strategy will be triggered. |
| |
| To achieve the goal of evenly distributing the traffic between the highest and lowest loaded brokers, the shedding strategy will |
| try to make the message rate of two brokers the same, which is (1000+500)/2=750. The shedding strategy will unload 250 message rate from the |
| highest loaded broker to the lowest loaded broker. After the shedding strategy is completed, the message rate of two brokers will be |
| same, which is 750. |
| |
| |
| ## improve the load balancing speed |
| As we mentioned earlier in `UniformLoadShedder`, if strategy only handles one high load broker at a time, it will take a long time to |
| complete all load balancing tasks. Therefore, we further optimize it by matching multiple pairs of high and low load brokers in |
| a single shedding. After sorting the broker scores, the first and last place are paired, the second and and the second to last are paired, |
| and so on. When the score difference between the two paired brokers is greater than the threshold, the load will be evenly distributed |
| between the two, which can solve the problem of slow speed. |
| |
| For example, if the broker rating of the current cluster is 20,30,52,70,80, we will pair 20 and 80, 30 and 70. As the difference between |
| the two paired brokers is 80-20=60, 70-30=40, which are both greater than the threshold 40, the shedding strategy will be triggered. |
| |
| |
| ## handle load jitter with multiple hits threshold |
| What about the historical weighting algorithm used in `ThresholdShedder`? It is used to solve the problem of load jitter, but previous |
| analysis and experiments have shown that it can bring serious negative effects, so we can no longer use this method to solve the |
| problem of load jitter. |
| |
| We mimic the way alarms are triggered: the threshold is triggered multiple times before the bundle unload is finally triggered. |
| For example, when the difference between a pair of brokers exceeds the threshold three times, load balancing is triggered. |
| |
| ## high and low threshold |
| In situations of cluster rolling restart or expansion, there is often a significant load difference between |
| different brokers, and we hope to complete load balancing more quickly. |
| |
| Therefore, we introduce two thresholds: |
| - loadBalancerAvgShedderLowThreshold, default value is 15 |
| - loadBalancerAvgShedderHighThreshold, default value is 40 |
| |
| Two thresholds correspond to two continuous hit count requirements: |
| - loadBalancerAvgShedderHitCountLowThreshold, default value is 8 |
| - loadBalancerAvgShedderHitCountHighThreshold, default value of 2 |
| |
| When the difference in scores between two paired brokers exceeds the `loadBalancerAvgShedderLowThreshold` by |
| `loadBalancerAvgShedderHitCountLowThreshold` times, or exceeds the `loadBalancerAvgShedderHighThreshold` by |
| `loadBalancerAvgShedderHitCountHighThreshold` times, a bundle unload is triggered. |
| For example, with the default value, if the score difference exceeds 15, it needs to be triggered 8 times continuously, |
| and if the score difference exceeds 40, it needs to be triggered 2 times continuously. |
| |
| The larger the load difference between brokers, the smaller the number of times it takes to trigger bundle unloads, |
| which can adapt to scenarios such as cluster rolling restart or expansion. |
| |
| ## placement strategy |
| As mentioned earlier, `AvgShedder` bundles the shedding and placement strategies, and a bundle has already determined |
| its next owner broker based on the shedding strategy during shedding. But we not only use placement strategies after |
| executing shedding, but also need to use placement strategies to assign bundles during cluster initialization, rolling |
| restart, and broker shutdown. So how should we assign these bundles without shedding strategies? |
| |
| We use a hash allocation method: hash mapping a random number to broker. Hash mapping roughly conforms to |
| a uniform distribution, so bundles will be roughly evenly distributed across all brokers. However, due to the different |
| throughput between different bundles, the cluster will exhibit a certain degree of imbalance. However, this problem is |
| not significant, and the subsequent balancing can be achieved through shedding strategies. Moreover, the frequency of |
| cluster initialization, rolling restart, and broker shutdown scenarios is not high, so the impact is slight. |
| |
| ## summary |
| In summary, `AvgShedder` can solve the problems of load jitter, heterogeneous environment, slow load balancing, etc. |
| Following table summarizes the advantages and disadvantages of the three options: |
| |
| | Combination | heterogeneous environment | load jitter | over placement problem | over unloading problem | slow load balancing | |
| |---------------------------------------------|------------------------|------------|-----------------------|-----------------------|--------------| |
| | ThresholdShedder + LeastResourceUsageWithWeight | normal | good | bad | bad | normal | |
| | UniformLoadShedder + LeastLongTermMessageRate | bad | bad | good | good | normal | |
| | AvgShedder | normal | good | good | good | good | |
| |
| |
| # Detailed Design |
| |
| ### Configuration |
| |
| To avoid introducing too many configurations when calculating how much traffic needs to be unloaded, `AvgShedder` reuses the |
| following three `UniformLoadShedder` configurations: |
| ``` |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload." |
| ) |
| private int minUnloadMessage = 1000; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload." |
| ) |
| private int minUnloadMessageThroughput = 1 * 1024 * 1024; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "In the UniformLoadShedder strategy, the maximum unload ratio." |
| ) |
| private double maxUnloadPercentage = 0.2; |
| ``` |
| |
| The `maxUnloadPercentage` controls the allocation ratio. Although the default value is 0.2, our goal is to evenly distribute the |
| pressure between two brokers. Therefore, we set the value to 0.5, so that after load balancing is completed, the message rate/throughput |
| of the two brokers will be almost equal. |
| |
| The following configurations are introduced to control the shedding strategy: |
| ``` |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "The low threshold for the difference between the highest and lowest loaded brokers." |
| ) |
| private int loadBalancerAvgShedderLowThreshold = 15; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "The high threshold for the difference between the highest and lowest loaded brokers." |
| ) |
| private int loadBalancerAvgShedderHighThreshold = 40; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "The number of times the low threshold is triggered before the bundle is unloaded." |
| ) |
| private int loadBalancerAvgShedderHitCountLowThreshold = 8; |
| |
| @FieldContext( |
| dynamic = true, |
| category = CATEGORY_LOAD_BALANCER, |
| doc = "The number of times the high threshold is triggered before the bundle is unloaded." |
| ) |
| private int loadBalancerAvgShedderHitCountHighThreshold = 2; |
| ``` |
| |
| |
| |
| # Backward & Forward Compatibility |
| |
| Fully compatible. |
| |
| # General Notes |
| |
| # Links |
| |
| * Mailing List discussion thread: https://lists.apache.org/thread/cy39b6jp38n38zyzd3bbw8b9vm5fwf3f |
| * Mailing List voting thread: https://lists.apache.org/thread/2v9fw5t5m5hlmjkrvjz6ywxjcqpmd02q |