PIP-322: Pulsar Rate Limiting Refactoring

Motivation

The current rate limiting implementation in Apache Pulsar has several known issues that impact performance and accuracy when operating Pulsar clusters under load (detailed in Problems to Address). This proposal outlines a refactor to consolidate multiple existing options into a single improved solution.

The refactor aims to resolve numerous user complaints regarding default Pulsar rate limiting being unusable. In addition, inconsistencies and thread contention with existing rate limiters cause unpredictable throttling and added latency.

Refactoring the built-in implementation will improve multi-tenancy, allow Pulsar to scale to demanding workloads, and address longstanding issues.

Rate limiters act as a conduit to more extensive capacity management and Quality of Service (QoS) controls in Pulsar. They are integral to Pulsar's core multi-tenancy features. This refactoring will pave the way for future enhancements.

Goals

In Scope

  • Preserve current functionality without breaking changes
  • Consolidate the multiple existing rate limiting options into a single, configurable rate limiting solution
  • Remove the separate “precise” rate limiter

Problems to Address

  • High CPU load with default rate limiter
  • High lock contention that impacts shared Netty IO threads and adds latency to unrelated Pulsar topic producers ([Bug] RateLimiter lock contention when use precise publish rate limiter #21442.)
  • Multiple limiting implementations (default, precise) which unnecessarily expose implementation details to users of Pulsar and make the code harder to maintain and improve
  • Inability to limit throughput consistently when using default rate limiter
  • Inconsistent behavior across multiple levels of throttling (broker, namespace, connection)
  • Code maintainability
    • Improve understandability of code

Out of Scope

  • Custom/pluggable rate limiters
  • Additional rate limiting features
  • Cluster-wide capacity management
  • Addressing the shared connection multiplexing problem where throttling multiple independent streams multiplexed on the same connection cannot be consistently throttled by pausing reads on the server side.

Current solution

Rate limiters in Pulsar

In Pulsar, rate limiters are used for a few cases:

  • publisher rate limiting
    • topic level (configured at namespace level or topic level policy with admin api)
    • broker level (configured in broker.conf)
    • resource group level (configured with resource groups admin api)
  • dispatcher rate limiting
  • subscribe rate limiting
  • namespace bundle unloading rate limiting

For producers (“publishers”), there are addition conditions to throttle, besides the rate limiters:

  • limiting pending publish requests per connection, configured with maxPendingPublishRequestsPerConnection in broker configuration
  • limiting memory for publishing messages, configured with maxMessagePublishBufferSizeInMB in broker configuration

Current publisher rate limiters in Pulsar

Pulsar contains two implementations for publisher rate limiters: “default” and “precise”. “precise” is the rate limiter implementation which is used when the broker is configured with preciseTopicPublishRateLimiterEnable=true in broker.conf.

Default publisher rate limiter

In this approach, a sub-second scheduler runs (configured with brokerPublisherThrottlingTickTimeMillis, defaults to 50ms), iterating every topic in the broker and checking if the topic has exceeded its threshold. If so, it will toggle the autoread state of the connection for the client's producer. Concurrently, a separate one-second scheduler resets the counters and re-enables throttled connections. This method results in inaccurate rate limiting. Additionally, this approach can result in increased CPU usage due to the operation of two schedulers which are constantly iterating all topics and toggling autoread states.

Precise publisher rate limiter

In this approach, the rate limit check is done on every send messages request and thus the rate limiting is enforced more accurately. This fixes the main issues of the default rate limiters. However, it introduces a lock contention problem since the rate limiter implementation extensively uses synchronous methods. Since this lock content happens on Netty IO threads, it impacts also unrelated topics on the same broker and causes unnecessary slowdowns as reported by bug #21442.

Publisher Throttling Approach

In the Pulsar binary protocol, the broker's only method of applying backpressure to the client is to pause reads and allow the buffers to fill up. There is no explicit protocol-level, permit-based flow control as there is for consumers.

When the broker throttles a producer, it needs to pause reading on the connection that the client‘s producer is using. This is achieved by setting the Netty channel’s autoread state to false.

The broker cannot reject a message that is already in progress. Pausing reading on the connection prevents the broker from receiving new messages and this throttles publishing. When reading is paused, Netty channel and OS-level TCP/IP buffers will fill up and eventually signal backpressure on the TCP/IP level to the client side.

In the current solution, when the rate limit is exceeded, the autoread state is set to false for all producers. Similarly, when the rate falls below the limit, the autoread state is set to true. When the broker-level limit is exceeded or falls below the limit, all producers in the entire broker are iterated. At the topic level, it‘s for all producers for the topic. In the resource group, it’s all producers part of the resource group.

This current solution therefore spends CPU cycles to iterate through the producers and toggle the autoread flags. It's not necessary to eagerly iterate all producers in a publisher rate limiter. Rate limiting can also be achieved when producers are lazily throttled only after they have sent a publishing send request (CommandSend) to the broker.

It‘s perfectly acceptable to throttle only active producers one by one after new messages have arrived when the rate limit has been exceeded. The end result is the same: the target rate can be kept under the limit. The calculated rate is always an average rate over a longer period of time. This is true in both the existing solution and the proposed solution. Over a very short time span, the observed rate can be extremely high. This all smoothens out when the rate calculation spans over hundreds of milliseconds or longer periods. This is why it’s perfectly fine to throttle producers as they produce messages. The proposed solution accounts all traffic in the rate limiter since the state is preserved over the rate limiting period (1 second) and isn‘t resetted as it is in the current solution which will miss accounting for traffic around the boundary when the limit has exceeded, but the connections haven’t yet been paused. That's yet another reason why the lazy approach is suitable for the new proposed solution.

The externally observable behavior of the rate limiting is actually better than before since it is possible to achieve fairness in a producer throttling solution that is implemented in this approach. Fairness is a general property that is expected in resource sharing approaches such that each resource consumer is given a similar share of the resource. In the current publisher rate limiting solution, there‘s no way to achieve fairness when the rate limit is being exceeded. In the proposed solution, fairness is achieved by using a queue to track which producer is given a turn to produce while the rate limit has been exceeded and producers are throttled. If the rate limit is again exceeded, the producer will be put back into the queue and wait for its turn until it can produce again. In the current solution, the producers are iterated in the order that they appear in the broker’s registry. The producers at the beginning get more chances to produce than the ones that are further down the list. The impact of this is the lack of fairness.

High-Level Design

The proposed refactor will refactor rate limiting internals while preserving existing user-facing public APIs and user-facing behavior. A token bucket algorithm will provide efficient and accurate calculations to throttle throughput.

Multiple built-in options such as “precise” rate limiter will be consolidated under a single solution. Performance issues caused by contention and CPU overhead will be addressed.

Detailed Design

Proposed Solution

Using an asynchronous token bucket algorithm

Token bucket algorithms are a common industry practice for handling traffic shaping. It is well understood and it‘s conceptually simple. A token bucket is simply a counter which is limited to a maximum value, the token bucket’s capacity. New tokens are added to the bucket with the configured rate. The usage consumes tokens from the token bucket. When the token bucket is empty, the usage should be backpressured. In use cases where the already accepted work cannot be rejected, the token value needs to also go to negative values.

Since token bucket algorithm is essentially a counter where new tokens are added based on the time that has elapsed since the last token update, it is possible to implement this algorithm in Java in a lockless, non-blocking way using compare-and-swap (CAS) operations on volatile fields. There is no need for a scheduler to add new tokens since the amount of new tokens to add can be calculated from the elapsed time. This assumption has already been validated in the https://github.com/lhotari/async-tokenbucket repository.

There's no current intention to use async-tokenbucket as a separate library. The AsyncTokenBucket class will be placed directly in the Pulsar code base. The reason to have async-tokenbucket repository separately is to have more detailed performance benchmarks there and a PoC of the high performance.

The purpose of the proof-of-concept async-tokenbucket was to ensure that it has an extremely low overhead which makes it feasible to calculate the amount of tokens in an eventually consistent manner with a configurable resolution, without a scheduler. The token bucket operations won't become a bottleneck since on a Dell XPS 2019 i9 laptop the benchmark showed about 900M token bucket ops/s and on MBP 2023 M3 Max it was around 2500M token bucket ops/s.

Internally AsyncTokenBucket uses an eventual consistent approach to achieve high performance and low overhead. What this means is that the token balance is updated once in every interval of the configured resolutionNanos (16 ms default) or when an explicit update of the balance is requested.

There is no separate scheduled task to add new tokens to the bucket. New tokens are calculated based on the elapsed time since the last update and added to the current tokens balance as part of the token balance update that happens when tokens are consumed, the throttling period is calculated or the token balance is queried.

For example, when tokens are consumed and the balance hasn't been updated in the current interval, new tokens will be calculated and added and limited by the token bucket capacity. The consumed tokens and pending consumed tokens will be flushed and substracted from the balance during the update.

If there was already an update for the tokens balance in the current internal, the consumed tokens are added to the pending consumed tokens LongAdder counter which will get flushed in the token balance update.

This makes the tokens balance eventually consistent. The reason for this design choice is to optimize performance by preventing CAS loop contention which could cause excessive CPU consumption.

Key methods in AsyncTokenBucket:

  • consumeTokens(): Consumes given number of tokens
  • consumeTokensAndCheckIfContainsTokens(): Consumes given number of tokens and checks if any tokens remain
  • containsTokens(): Checks if any tokens remain
  • calculateThrottlingDuration(): Computes how long throttling should last until the token bucket contains at least 16 milliseconds worth of tokens filled with the configured rate.

The token balance in AsyncTokenBucket is eventually consistent and differ from the actual token count by up to 16 milliseconds (default resolutionNanos) worth of consumption. This is not a problem since when the throttling finally happens, the strongly consistent value is used for throttling period calculations and no consumed tokens are missed in the calculations since the token value can go to negative values too. The average rate will smoothen out to meet the target rate of the rate limiter with this eventual consistent solution and it doesn't impact the externally observable behavior.

For unit tests, eventual consistent behavior can be a challenge. For that purpose, its possible to switch the AsyncTokenBucket class to a strongly consistent mode for unit tests by calling static switchToConsistentTokensView and resetToDefaultEventualConsistentTokensView methods on the class.

One notable improvement of AsyncTokenBucket is that it is completely non-blocking and lockless. Using AsyncTokenBucket as the basis for publishing rate limiters in Pulsar will address a severe performance bottleneck in Pulsar with the “precise” rate limiter. This is reported as[Bug] RateLimiter lock contention when use precise publish rate limiter #21442.

Unifying rate limiting implementations and improving code maintainability

In the proposed solution there's no need for separate “default” and “precise” rate limiting options. A single implementation will be used. This improves understandability, code quality and maintainability.

Fixing the inconsistency of multiple levels of throttling

In Pulsar throttling can happen for producers in 5 different ways simultaneously: publisher rate limiting happens at 3 levels: broker, topic, resource group and in addition there's backpressure for limiting number of pending publish requests and limiting memory used for publishing. (detailed in Rate limiters in Pulsar).

When there are 5 different simultaneous conditions for throttling a connection, the connection should be throttled as long as any of these conditions is present. In the current code base, this handling is prone to errors and overly complex. There are also cases where one rate limiter sets the autoread to false and another immediately sets it to true although the connection should remain throttled as long as one of the conditions exists.

The fix for this issue is in the proposal by introducing a new concept ServerCnxThrottleTracker, which will track the “throttle count”. When a throttling condition is present, the throttle count is increased and when it‘s no more present, the count is decreased. The autoread should be switched to false when the counter value goes from 0 to 1 and only when it goes back from 1 to 0 should it set to true again. The autoread flag is no more controlled directly from the rate limiters. Rate limiters are only responsible for their part and it’s ServerCnxThrottleTracker that decides when autoread flag is toggled.

Integrating AsyncTokenBucket with the refactored PublishRateLimiterImpl

In the refactored PublishRateLimiterImpl, there‘s a AsyncTokenBucket instance for the message rate limit and for the bytes rate limit. When the publish operation starts in the broker, it will call the topic’s incrementPublishCount method and pass the reference to the producer that is starting the operation, in addition to the number of messages and the total bytes size of the send request (CommandSend message).

This delegates to a call for all possibly active rate limiters in the topic, at the broker level, at resource group level and at topic level.

For each rate limiter, the PublishRateLimiterImpl's handlePublishThrottling method will be called which also gets the producer reference and the number of message and total bytes size as input.

The rate limiter instance could contain both a message limit and a bytes limit. It will call AsyncTokenBucket's consumeTokensAndCheckIfContainsTokens method for each instance. If either call returns false, it means that the producer that produced the message should be throttled.

Throttling is handled by calling producer‘s incrementThrottleCount method which will be delegated producer’s connection‘s ServerCnxThrottleTracker’s incrementThrottleCount method which was described in the previous section.

The contract of the incrementThrottleCount method is that decrementThrottleCount method should be called when the throttling is no longer needed from an individual PublishRateLimiterImpl instance's perspective.

This is handled by first adding the throttled producer to a queue. A task will be scheduled to handle unthrottling from the queue after the throttling duration which is calculated by calling AsyncTokenBucket‘s calculateThrottlingDuration method. This task will only be scheduled unless there’s an already scheduled task in progress.

When the unthrottling task runs, it will process the unthrottling queue and keep on unthrottling producers while there are available tokens in the token buckets. If the queue isn't empty, it will repeat the cycle by scheduling a new task after the throttling duration calculated with the calculateThrottlingDuration method. This happens until the queue is empty and will start again if more producers are throttled.

The producer‘s connection will get throttled by setting autoread to false ServerCnxThrottleTracker. The PublishRateLimiterImpl instances don’t have to know whether the connection was already throttled due to another effective rate limit being over the limit. ServerCnxThrottleTracker will also handle setting autoread to true once all rate limiters operating on the same connection have unthrottled the producer by calling decrementThrottleCount.

Preserve Public Contracts

  • Avoid breaking existing configs, APIs or client functionality.
  • Handle through internal refactoring.

Public-facing Changes

There are no changes to existing configs, CLI options, monitoring etc. This PIP is about a large change which includes a major refactoring and multiple improvements and bug fixes to rate limiting.

More Detailed Level Design

Please refer directly to the pull request with the proposed changes for the more detailed level changes.

The implementation level detail questions can be handled in the pull request review. The goal is to document low level details directly in the Javadoc and comments so that it serves the code maintainers also in the future.

Links