As previously shared with the community, we observed many improvement areas around the Pulsar load balancer[1]. Since the improvement requires significant changes, first, we would like to share the overall goals for this project and the high-level components to design. This doc will highlight the architecture of the new broker load balancer.
We set up the project goals in the following areas.
We will add the transfer unload option --dest
to specifically unload the topic(bundle) to the destination broker.
pulsar-admin topics unload persistent://tenant/namespace/topic --dest ${destination_broker}
Write:
Read:
Write:
Read:
Write:
Read:
Write:
Read:
This bundle state channel is a persistent topic table-view used as a WAL to broadcast the total order of all bundle state changes in the cluster. All brokers will asynchronously consume messages in this channel in the same order and react to bundle state changes(sequential consistency). With the table-view compaction, the bundle state channel will eventually materialize the current bundle-broker ownership. Read operations on this channel can be deferred(e.g., clients’ topic lookup requests) in a few seconds, depending on the current state of the bundle.
We define the following states and actions and linearize the bundle state changes. (This is a high-level design to explain the concept here. The final version may differ.)
Bundle Actions
Bundle States
*New client connections to the bundle are deferred(with timeouts) in the Assigning state.
The bundle state channel can be used like the followings.
(State, Action) Sequence: (Assigned, Transfer) => (Assigning, Return) => (Assigned,)
(State, Action) Sequence: (Assigned, Split) => (Splitting, Unload | Create) => {(Unassigned, ) | (Assigned, ), (Assigned, )}
(State, Action) Sequence: (Unassigned, Own) => (Assigning, Return) => (Assigned,)
Because the bundle state channel shows the current bundle-broker ownership, we can remove the redundant bundle ownership store(ZK znodes). Each broker will look up the bundle ownership channel to check which broker currently owns the requested bundles or is in the ownership assignment/unload(transfer) process. Besides, before return, the broker availability metadata store(LocalBrokerData znode existence) could be checked to confirm the owner brokers' availability further.
Bundle State Channel(BSC) is another topic, and because of its circular dependency, we can't use the BundleStateChannel to find the owner broker of the BSC topic. For example, when a cluster starts, each broker needs to initiate BSC TopicLookUp(to find the owner broker) in order to consume the messages in BSC. However, initially, each broker does not know which broker owns the BSC.
The ZK leader election can be a good option to break this circular dependency, like the followings.
The cluster can use the ZK leader election to select the owner broker. If the owner becomes unavailable, one of the followers will become the new owner. We can elect the owner for each bundle state partition.
Then, in brokers’ TopicLookUp logic, we will add a special case to return the current leader(the elected BSC owner) for the BSC topics.
Without distributed locks, we can resolve conflicting state changes by a conflict state resolution algorithm in an optimistic and eventual manner. Brokers can take the first valid state change in the linearized view as the winner state and ignore the later ones.
One caveat is that because the current table-view compaction takes only the last ones as the result values, we need to introduce an internal compaction algo for this channel to follow the conflict resolution algorithm(the first valid state change as the result value).
Bundle State Conflict Resolution Algorithm Example For each bundle: // A[i] is a linearized bundle state change action at i, and // S is the current bundle state after A[i-1], // where the sequence number i monotonically increases. for each A[i] and S: // no arrows in the state diagram If A[i] is invalid from S: Reject A[i] Else: Accept A[i]
For instance, let’s say for bundle x, there are two conflicting assignments initiated. The linearized state change messages will be like the following. (own, to:B), (own, to:A) By the conflict resolution algorithm, the second state change (own, to:A) will be ignored by all brokers(and by the compaction algorithm). Eventually, the “return” message will be broadcasted by declaring that the owner is “B.” (own, to:B), (own, to:A), (return, to:B)
Let’s take another example. Let’s say bundle x is already assigned to broker B, but another broker initiates the “own” action(before consuming the “return” action). This last “own” state change will be ignored since this action “own” is invalid from the previous state “assigned.” (in the above state diagram, there is no “own” action arrow from the “assigned” state.) (own, to:B), (return, to:B), (own, to:A)
When state change participants(brokers) are suddenly unavailable, the state change could become an orphan, as the participants do not play the role. For these orphan state changes, the leader broker will run orphan state clean-up logic. For instance, the leader can add the bundle state clean-up logic in the broker unavailability notification handler(znode watcher) in order to clean the pending bundle state changes and ownerships from unavailable brokers. Also, to make the clean-up logic further fault-tolerant, the leader broker will run the clean-up function when it initializes. Additionally, we could make the leader periodically call the clean-up in a separate monitor thread(we shouldn’t redundantly call this cleanup too often).
Every broker will be notified when its ZK session undergoes the connection issue. Then, the brokers will be in the “safe” mode, serving the existing topics as-is, but not allowing the ZK-related operations. The leader won't run the bundle cleanup, transfer, nor unload logic in this case when it knows ZK is down.
When ZK comes back, each broker will know ZK sessions are re-established. They will wait 2-3 mins for all brokers to complete the ZK hand-shaking. Then, they will recover the bundle state table-view and return to the normal mode.
Expected read/write traffic: Write: there will be relatively fewer messages from the write path with occasional spikes Read: the fan-out broadcast could cause bottlenecks when the cluster is enormous.
This bundle state channel is relatively lightweight from the producers because bundle state change is relatively less frequent. Still, message dispatch to consumers could be heavier if the cluster is very large. The same issue can happen to other table-views(BrokerLoadDataStorage) introduced in this proposal. We could consider the following methods to scale the table views’ produce/consume rates in a large cluster.
Simply, one can split a massive broker cluster into multiple clusters with different endpoints. The bookkeeper and configuration layer can be shared among the broker clusters.
One can make the table views based on partitioned topics. Then, we can distribute message load to multiple partition owner brokers.
As the conventional scalability method, one could shard the cluster to multiple groups of brokers. Then, we can create a separate channel for each shard of brokers. This means we need an additional discovery layer to map topics to broker shards(also need to align with Namespace Isolation Policies)
We need to mention that this metadata sync scalability issue is not new in Pulsar, as the current Pulsar uses n-replication. For instance, all brokers' and all bundles' load metadata are replicated to all brokers via ZK watchers. Currently, distributed ZK servers send znode watch notifications to its clients(brokers). In this proposal, multiple table-view owner brokers(with partitioned table-views) can dispatch metadata change messages to the participants(brokers).
We think this metadata sync scalability is relatively low-priority, as only a few customers run Pulsar clusters on such a large scale. We could ask the customers first to split the cluster into multiple clusters and then enable partitioned table views. It is not practical for a single cluster to have thousands of brokers. However, we still want to ensure this design is seamlessly extensible, as a two-way-door decision.
As the PIP changes almost every place (data models, event handlers, cache/storage, logs/metrics), creating a new load balancer and isolating the new code is safer and cleaner. Then, customers could safely enable/disable the new load balancer by a configuration before deprecating the old one.
It gives the flexibility to start fresh without the existing baggage of choices and try a significantly different approach. The current ModularLoadManagerImpl will not go away. Once the new load manager will be ready and considered stable enough, there might be a new discussion on whether to change the default implementation. Even then, users will still be able to opt for the old load manager.
The followings exclude logic and algorithm modifications as this pip does not focus on the logic and algorithm improvement.
Goals | Before | After |
---|---|---|
Make load balance operations fault-tolerant and consistent among brokers | The leader broker sends load balance commands to the owner brokers via RPC with retries. | We introduce a global bundle state channel(a persistent topic table-view), where a total order of bundle commands is reliably persisted and broadcasted by all brokers. |
Distribute load balance operations | The leader broker decides on bundle assignment, unload, and splitting. The owner brokers run the unload and split operations notified via RPC. | Each broker decides and runs bundle assignment and split operations. The leader decides bundle unload(transfer), and the owner brokers run the unload operation, notified via the bundle state channel. |
Reduce load data replication among brokers | All brokers’ and all bundles’ load data are stored in ZK and replicated to all brokers via ZK watchers. | All brokers’ load data is replicated to all brokers via a non-persistent topic(table-view). Only top-n bundles’ load data from each broker is replicated to the leader broker via a non-persistent topic(table-view). |
Minimize the topic unavailability from unloading | After topic connections are closed, clients reconnect to a new broker, and the new broker initiates a new topic assignment. The leader broker assigns a new owner, and eventually, the client will be redirected to the new owner broker. | We introduce a new unload option, “transfer”, where the new owner is pre-assigned before the topic connections are closed. Clients immediately redirect to the new owner broker without the client-initiated topic assignments. |
Share bundle-broker ownership metadata among brokers for owner broker discovery | The bundle-broker ownership data are stored in ZK. All brokers read bundle ownership info upon TopicLookUp requests(with caching local bundle ownership info). | The global ownership data is stored in the bundle state channel(a persistent topic table-view). With compaction, all brokers read its latest global ownership table-view(cached in memory) upon TopicLookUp requests. |
Show transparent load balance decisions with logs and metrics | Emit logs best-effort basis. | We design logging/metrics as separate logical components. We document and share major log messages and metrics for all important load balance events |
Added ServiceConfiguration
### --- Load balancer extension --- ### # Option to enable the debug mode for the load balancer logics. # The debug mode prints more logs to provide more information such as load balance states and decisions. # (only used in load balancer extension logics) loadBalancerDebugModeEnabled=false # The target standard deviation of the resource usage across brokers # (100% resource usage is 1.0 load). # The shedder logic tries to distribute bundle load across brokers to meet this target std. # The smaller value will incur load balancing more frequently. # (only used in load balancer extension TransferSheddeer) loadBalancerBrokerLoadTargetStd=0.25 # Threshold to the consecutive count of fulfilled shedding(unload) conditions. # If the unload scheduler consecutively finds bundles that meet unload conditions # many times bigger than this threshold, the scheduler will shed the bundles. # The bigger value will incur less bundle unloading/transfers. # (only used in load balancer extension TransferSheddeer) loadBalancerSheddingConditionHitCountThreshold=3 # Option to enable the bundle transfer mode when distributing bundle loads. # On: transfer bundles from overloaded brokers to underloaded # -- pre-assigns the destination broker upon unloading). # Off: unload bundles from overloaded brokers # -- post-assigns the destination broker upon lookups). # (only used in load balancer extension TransferSheddeer) loadBalancerTransferEnabled=true # Maximum number of brokers to unload bundle load for each unloading cycle. # The bigger value will incur more unloading/transfers for each unloading cycle. # (only used in load balancer extension TransferSheddeer) loadBalancerMaxNumberOfBrokerSheddingPerCycle=3 # Delay (in seconds) to the next unloading cycle after unloading. # The logic tries to give enough time for brokers to recompute load after unloading. # The bigger value will delay the next unloading cycle longer. # (only used in load balancer extension TransferSheddeer) loadBalanceSheddingDelayInSeconds=180 # Broker load data time to live (TTL in seconds). # The logic tries to avoid (possibly unavailable) brokers with out-dated load data, # and those brokers will be ignored in the load computation. # When tuning this value, please consider loadBalancerReportUpdateMaxIntervalMinutes. #The current default is loadBalancerReportUpdateMaxIntervalMinutes * 2. # (only used in load balancer extension TransferSheddeer) loadBalancerBrokerLoadDataTTLInSeconds=1800 # Max number of bundles in bundle load report from each broker. # The load balancer distributes bundles across brokers, # based on topK bundle load data and other broker load data. # The bigger value will increase the overhead of reporting many bundles in load data. # (only used in load balancer extension logics) loadBalancerMaxNumberOfBundlesInBundleLoadReport=10 # Service units'(bundles) split interval. Broker periodically checks whether # some service units(e.g. bundles) should split if they become hot-spots. # (only used in load balancer extension logics) loadBalancerSplitIntervalMinutes=1 # Max number of bundles to split to per cycle. # (only used in load balancer extension logics) loadBalancerMaxNumberOfBundlesToSplitPerCycle=10 # Threshold to the consecutive count of fulfilled split conditions. # If the split scheduler consecutively finds bundles that meet split conditions # many times bigger than this threshold, the scheduler will trigger splits on the bundles # (if the number of bundles is less than loadBalancerNamespaceMaximumBundles). # (only used in load balancer extension logics) loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3 # After this delay, the service-unit state channel tombstones any service units (e.g., bundles) # in semi-terminal states. For example, after splits, parent bundles will be `deleted`, # and then after this delay, the parent bundles' state will be `tombstoned` # in the service-unit state channel. # Pulsar does not immediately remove such semi-terminal states # to avoid unnecessary system confusion, # as the bundles in the `tombstoned` state might temporarily look available to reassign. # Rarely, one could lower this delay in order to aggressively clean # the service-unit state channel when there are a large number of bundles. # minimum value = 30 secs # (only used in load balancer extension logics) loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600 # Option to automatically unload namespace bundles with affinity(isolation) # or anti-affinity group policies. # Such bundles are not ideal targets to auto-unload as destination brokers are limited. # (only used in load balancer extension logics) loadBalancerSheddingBundlesWithPoliciesEnabled = false # Time to wait before fixing any stuck in-flight service unit states. # The leader monitor fixes any in-flight service unit(bundle) states # by reassigning the ownerships if stuck too long, longer than this period. # (only used in load balancer extension logics) loadBalancerInFlightServiceUnitStateWaitingTimeInMillis = 30000; # The service unit(bundle) state channel is periodically monitored # by the leader broker at this interval # to fix any orphan bundle ownerships, stuck in-flight states, and other cleanup jobs. # `loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds` * 1000 must be bigger than # `loadBalancerInFlightServiceUnitStateWaitingTimeInMillis`. # (only used in load balancer extension logics) loadBalancerServiceUnitStateMonitorIntervalInSeconds = 60;