Background knowledge

  • Pulsar broker load balancer periodically unloads bundles from overloaded brokers. During this unload process, previous owner brokers close topic sessions(e.g. producers, subscriptions(consumers), managed ledgers). When re-assigned, new owner brokers recreate the topic sessions.

  • Pulsar clients request CommandLookupTopic to lookup or assign owner brokers for topics and connect to them.

  • PIP-192, the extensible load balancer introduced the bundle state channel that event-sources this unloading process in a state machine manner, from releasing, assigned, to owned state order. At releasing, the owner broker “releases” the bundle ownership(close topic sessions).

  • PIP-192, the extensible load balancer introduced TransferShedder, a new shedding strategy, which pre-assigns new owner brokers beforehand.

Motivation

  • When unloading closes many topic sessions, then many clients need to request CommandLookupTopic at the same time, which could cause many lookup requests on brokers. This unloading process can be further optimized if we can let the client directly connect to the new owner broker without following CommandLookupTopic requests.
  • In the new load balancer(pip-192), since the owner broker is already known, we can modify the close command protocol to pass the new destination broker URL and skip the lookup requests.
  • Also, when unloading, we can gracefully shutdown ledgers -- we always close old managed ledgers first and then recreate it on the new owner without conflicts.

Goals

  • Remove clients' lookup requests in the unload protocol to reduce the publish latency spike and e2e latency spike during unloading and also to resolve bottlenecks (of thundering lookups) when there are a large number of topics in a cluster.
  • Gracefully shutdown managed ledgers before new owners create them to reduce possible race-conditions between ledger close and ledger creations during unloading.

In Scope

  • This change will be added in the extensible load balancer.

Out of Scope

  • This won't change the existing load balancer behavior(modular load manager).

High Level Design

To achieve the goals above, we could modify the bundle transfer protocol by the following. The proposed protocol change is based on the bundle states from PIP-192.

Basically, we could close the ledgers only in the releasing state and finally disconnect clients in the owned state with destination broker urls. The clients will directly connect to the pre-assigned destination broker url without lookups. Meanwhile, during this transfer, any produced messages will be ignored by the source broker.

Current Unload and Lookup Sequence in Extensible Load Balancer

sequenceDiagram
    participant Clients
    participant Owner Broker
    participant New Owner Broker
    participant Leader Broker
    Leader Broker ->> Owner Broker: "state:Releasing:" close topic
    Owner Broker ->> Owner Broker: close broker topic sessions
    Owner Broker ->> Clients: close producers and consumers
    Clients ->> Clients: reconnecting (inital delay 100ms)
    Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
    New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
    Clients ->> Owner Broker: lookup
    Owner Broker ->> Clients: redirect
    Clients ->> New Owner Broker: lookup
    New Owner Broker ->> Clients: return(connected)

Proposed Unload Sequence in Extensible Load Balancer without Lookup

sequenceDiagram
    participant Clients
    participant Owner Broker
    participant New Owner Broker
    participant Leader Broker
    Leader Broker ->> Owner Broker: "state:Releasing:" close topic
    Owner Broker ->> Owner Broker: close broker topic sessions(e.g ledgers) without disconnecting producers/consumers(fenced)
    Clients -->> Owner Broker: message pubs are ignored
    Owner Broker ->> New Owner Broker: "state:Assign:" assign new ownership
    New Owner Broker ->> Owner Broker: "state:Owned:" ack new ownership
    Owner Broker ->> Owner Broker: close the fenced broker topic sessions
    Owner Broker ->> Clients: close producers and consumers (with newOwnerBrokerUrl)
    Clients ->> New Owner Broker: immediately connect

Detailed Design

Design & Implementation Details

  • Modify CommandCloseProducer, CommandCloseConsumer to pass optional brokerServiceUrls
message CommandCloseProducer {
required uint64 producer_id = 1;
required uint64 request_id = 2;
+ optional string assignedBrokerServiceUrl      = 3;
+ optional string assignedBrokerServiceUrlTls   = 4;
}

message CommandCloseConsumer {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
+ optional string assignedBrokerServiceUrl      = 3;
+ optional string assignedBrokerServiceUrlTls   = 4;
}
  • Add new disconnect apis on producer and consumer to pass dstBrokerLookupData
public CompletableFuture<Void> disconnect(Optional<BrokerLookupData> dstBrokerLookupData) {
  • Modify the Topic.close() behavior to optionally skip producers.disconnect() and consumers.disconnect().
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect,
                                          boolean closeWithoutDisconnectingClients) {

Eventual Consistency of Ownership States

This protocol and ownership state checks follow the eventual consistency of the bundle state channel introduced in PIP-192.

After the client connects to the destination broker, the next command(e.g. ProducerCommand) requires the destination broker to check the ownership again against its local table view of the bundle state channel.

Upon this local ownership check, there could be the following scenarios:

Happy case:

  • If the ownership state is owned and the current broker is indeed the owner, the command completes.

Unhappy cases:

  • If the ownership state is owned and the current broker is not the owner, the command fails (the broker returns an error to the client), and the client tries to find the true new owner by lookups. The global bundle state channel is eventually consistent, and the lookups should eventually converge.
  • if the ownership change is still in progress(releasing, assigning), this check will be deferred until the state becomes owned with a timeout.

Failure Recovery of Ownership States

The failure recovery logic relies on the bundle state channel cleanup logic introduced in PIP-192.

When the destination or source broker crashes in the middle of unloading, the leader will find the orphan state and clean the ownership by selecting a new owner, and the client will reconnect to it. During this transfer process, if alive, the source broker will serve the topic according to the protocol described in the PIP.

Public-facing Changes

Public API

Binary protocol

  • Modify CommandCloseProducer, CommandCloseConsumer to pass optional assignedBrokerServiceUrls like the above.

Configuration

CLI

Metrics

Monitoring

Security Considerations

Backward & Forward Compatability

  • We are adding new parameters in the close producer and consumer command protocol, the old client versions should not see the optional destination urls in the close commands. Hence, they will request lookups.

Revert

Upgrade

Alternatives

General Notes

Links

  • Mailing List discussion thread:
  • Mailing List voting thread: