PIP-430: Pulsar Broker Cache Improvements: Refactoring Eviction and Adding a New Cache Strategy Based on Expected Read Count

Background Knowledge

Apache Pulsar brokers maintain an in-memory entry cache to reduce latency and load on BookKeeper and tiered storage (S3) by serving frequently accessed message data directly from memory.

Key concepts:

  • ManagedLedgerImpl: Manages storage for a single topic partition. Each ManagedLedgerImpl instance has its own EntryCache instance.
  • EntryCache: Stores message entries (payloads) in memory. The default implementation is RangeEntryCacheImpl, which uses a RangeCache internally.
  • RangeCache: A specialized cache that stores entries mapped by their Position (ledgerId, entryId). It supports range-based operations for retrieval and expiration.
  • EntryCacheManager (RangeEntryCacheManagerImpl): A global component that limits the total size of all entry caches in a broker. When the total size exceeds a threshold, it triggers eviction.
  • Cache Eviction Policies: Mechanisms to remove entries from the cache to make space or remove old data.
    • Timestamp-based eviction: Removes entries older than a configured threshold (e.g., managedLedgerCacheEvictionTimeThresholdMillis). This is currently handled periodically by ManagedLedgerFactoryImpl iterating over all managed ledgers.
    • Size-based eviction: Removes entries when the total cache size exceeds a configured limit (managedLedgerCacheSizeMB). The current implementation resides in EntryCacheDefaultEvictionPolicy, which selects a subset of larger caches and proportionally evicts entries from each entry cache to keep the total cache size under the limit.
    • Cursor-based eviction: Invalidates cache entries up to the slowest consumer's read position or mark-delete position, depending on cacheEvictionByMarkDeletedPosition.

The broker cache serves various read patterns:

  • Tailing reads: Consumers reading the latest published messages.
  • Catch-up reads (backlogged cursors): Consumers reading older messages to catch up.
  • Key_Shared subscription reads: Messages with the same key are routed to the same consumer. If a consumer is slow, messages might be replayed while the cursor reads more messages for available consumers.

Motivation

The current Pulsar broker entry cache implementation and its eviction mechanisms face several challenges that impact performance, efficiency, and predictability:

  1. Inefficient and Flawed Size-Based Eviction: The EntryCacheDefaultEvictionPolicy (the current default for size-based eviction) does not guarantee the removal of the oldest entries globally. It sorts individual EntryCache instances by their size, selects a percentage of the largest caches, and then asks each of them to evict a proportional amount of data. This can lead to newer entries being evicted from large, active caches while older, less relevant entries remain in smaller or less active caches, resulting in suboptimal cache utilization and potentially lower hit rates.

  2. Inefficient and Incorrect Timestamp-Based Eviction: The existing timestamp-based eviction mechanism, triggered by ManagedLedgerFactoryImpl, has significant performance and correctness issues:

    • Performance: It iterates through all ManagedLedgerImpl instances and their respective EntryCache instances periodically (default every 10ms). In brokers with a large number of topics, this frequent and exhaustive iteration leads to high CPU utilization and memory pressure.
    • Correctness: The per-cache eviction (e.g., RangeCache.evictLEntriesBeforeTimestamp) often assumes entries within a single RangeCache are primarily ordered by timestamp due to typical append-only workloads. This assumption breaks down with mixed read patterns like catch-up reads or when entries are inserted out of their natural position order (Key_shared subscription replay queue scenario), potentially leading to incorrect eviction decisions or inefficient scanning.
  3. Limited Cache Scope and Effectiveness for Diverse Read Patterns: The original RangeCache was primarily designed with tailing reads in mind. While support for caching backlogged cursors and replay queue reads was added later, the eviction algorithms were not holistically updated to effectively manage mixed read patterns (tailing, catch-up, replays in Key_Shared). This can lead to:

    • Unnecessary BookKeeper and tiered storage (S3) reads during catch-up scenarios, even if data was recently read for another consumer.
    • Poor cache hit rates for Key_Shared subscriptions with slow consumers, as entries might be evicted before a replayed message (due to consumer unacknowledgment or redelivery request) is read again.
  4. Foundation for Advanced Caching Strategies Needed: The current cache architecture makes it difficult to implement more intelligent caching strategies that could further optimize for common Pulsar use cases, such as efficiently handling fan-out to multiple shared consumers or retaining entries expected to be read by several cursors.

Addressing these issues is crucial for improving broker performance, reducing operational costs (lower BookKeeper load), and providing a more robust caching layer that can adapt to diverse workloads.

Goals

The refactoring aims to make cache eviction more robust, performant, and predictable. The “expected read count” strategy is an attempt to make the cache more aware of Pulsar's specific consumption patterns.

In Scope

  • Refactor Cache Eviction Mechanism:
    • Replace the existing per-cache iteration for timestamp eviction and the EntryCacheDefaultEvictionPolicy for size-based eviction with a centralized, insertion-order aware mechanism.
    • Implement a RangeCacheRemovalQueue that tracks all cached entries globally in approximate insertion order.
    • Ensure timestamp-based eviction reliably removes entries older than the threshold by processing this queue.
    • Ensure size-based eviction globally removes the oldest entries first from this queue until the target size is met.
  • Introduce “Expected Read Count” Cache Strategy:
    • Implement a new caching strategy where entries track an “expected read count,” representing how many active cursors are anticipated to read that entry.
    • Prioritize retaining entries with a positive expected read count during size-based eviction.
    • Provide a new configuration option (cacheEvictionByExpectedReadCount) to enable this strategy.
  • Introduce LRU Cache Behavior:
    • The entry's TTL will be extended by managedLedgerCacheEvictionTimeThresholdMillis during eviction check if it has been accessed since the last check.
    • This behavior is controlled by the managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed configuration option.
    • This will help keep entries in the cache longer if they are accessed frequently. This will improve cache hit rates for topics with large fan-out consumers in scenarios where the consumers aren‘t connected at the time that the entry is read and aren’t covered by the “expected read count” cache strategy. This is useful when all consumers are reconnecting after a broker restart, simultaneous network disconnection, or namespace bundle load balancing event.
  • Improve Performance and Efficiency:
    • Reduce CPU overhead associated with cache eviction, particularly timestamp-based eviction.
    • Improve overall cache hit rates by making better eviction decisions.
  • Enhance Correctness: Ensure eviction policies work correctly across various read patterns.
  • Provide a Foundation for Future Cache Optimizations: The refactored design should make it easier to implement further caching improvements, such as more sophisticated strategies for catch-up reads or Key_Shared subscriptions.
  • Simplify RangeCache Implementation: Remove unnecessary generic type parameters from the RangeCache implementation. Since RangeCache is used exclusively for a single purpose within the Pulsar codebase (caching managed ledger entries), the generic key and value parameters add unnecessary complexity without providing any practical benefit. This simplification will improve code readability and reduce cognitive overhead.

High Level Design

The proposed solution involves two main components: a refactored eviction mechanism using a centralized removal queue and a new cache strategy based on expected read count and additionally LRU cache behavior.

1. Centralized Cache Eviction with RangeCacheRemovalQueue

A new component, RangeCacheRemovalQueue, will be introduced at the EntryCacheManager level.

  • Entry Tracking: When an entry is inserted into any RangeEntryCacheImpl (the per-ledger cache), a lightweight wrapper for this entry (RangeCacheEntryWrapper) is also added to this global RangeCacheRemovalQueue. This queue maintains entries in their global insertion order (FIFO). This wrapper is already necessary in RangeEntryCacheImpl to prevent consistency issues. The current internal wrapper class is refactored to a top-level class so that it can be used with the removal queue.
  • Timestamp-Based Eviction: A single, periodic task (e.g., managed by ManagedLedgerFactoryImpl's cacheEvictionExecutor) will process the RangeCacheRemovalQueue. It will iterate from the head of the queue, removing entries whose timestampNanos are older than the cacheEvictionTimeThresholdNanos. Since the queue is insertion-ordered, this process can often stop early once it encounters an entry that is not expired.
  • Size-Based Eviction: When the EntryCacheManager detects that the total cache size exceeds evictionTriggerThresholdPercent * maxSize, it will trigger an eviction cycle. This cycle will also process the RangeCacheRemovalQueue from the head, removing the oldest entries (regardless of which specific ledger they belong to) until the cache size is brought down to cacheEvictionWatermark * maxSize.
  • Requeuing Mechanism: During time-based or size-based eviction, if an entry is encountered that should not be evicted immediately (e.g., due to a positive “expected read count” as per the new strategy), it will be temporarily “requeued” (moved to the end of the removalQueue). Requeued entries are reconsidered in subsequent eviction passes or if they eventually expire by managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes setting. This prevents premature eviction of entries that are likely to be read again soon. Similarly, there's a requeue mechanism for entries that have been accessed since the last eviction check and have managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed set to true. If the entry has been accessed since the last eviction check, it will be requeued to the end of the removalQueue and its TTL will be extended by managedLedgerCacheEvictionTimeThresholdMillis. This is similar to LRU cache behavior.

This centralized approach replaces the distributed, per-cache iteration for timestamp eviction and the less precise EntryCacheDefaultEvictionPolicy for size eviction, leading to more globally optimal and efficient eviction decisions.

S3FIFO Cache Eviction Algorithm Influence

This FIFO queue-based solution is influenced by the S3FIFO cache eviction algorithm (algorithm explanation), although it is not an implementation of S3FIFO. It accomplishes a similar goal as S3FIFO, but for the managed ledger broker cache. The goal of S3FIFO is to quickly remove entries that are unlikely to be accessed in the future. Similarly, the PIP-430 broker cache removal queue aims to quickly and efficiently remove entries that would be unlikely to be accessed in the future. The reason why S3FIFO is not implemented in broker cache is that in broker caching, the “expected read count” concept is used to determine whether an entry is likely to be accessed in the future. This information is not available in generic cache implementations, and in the broker cache this information can be used to keep entries in the cache longer that are likely to be accessed in the future.

The S3FIFO algorithm explanation blog post states: “While the eviction algorithms so far have been centered around LRU, I believe modern eviction algorithms should be designed with FIFO queues.” This is one of the reasons why the PIP-430 broker cache removal queue is also based on a FIFO queue.

2. “Expected Read Count” Cache Strategy

This new strategy aims to improve cache hit rates by retaining entries that are likely to be read by multiple active consumers/cursors.

  • EntryReadCountHandler: Each cached entry (EntryImpl) will be associated with an EntryReadCountHandlerImpl. This handler maintains an expectedReadCount (an atomic integer).
  • Initialization:
    • When a new entry is added to the ledger (OpAddEntry), its expectedReadCount is initialized to the number of active cursors.
    • When entries are read from BookKeeper or tiered storage and inserted into the cache (RangeEntryCacheImpl.readFromStorage), the expectedReadCount is initialized based on the current state of active cursors currently positioned before or at the entry being added. This information is sourced from ActiveManagedCursorContainer.getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor).
  • Dynamic Updates:
    • When an entry is actually delivered to a consumer from the cache and subsequently released by the delivery mechanism, its expectedReadCount is decremented (via EntryReadCountHandler.markRead()).
    • The expectedReadCount on a cached entry is incremented when the entry is added to the replay queue. This will de-prioritize the removal of the entry from the cache when size-based eviction is performed so that when the Key_Shared consumer is available to read the entry, it would more likely be available in the cache.
  • Eviction Consideration:
    • The RangeCacheRemovalQueue's size-based eviction logic will consult EntryImpl.hasExpectedReads(). This method returns true if expectedReadCount > 0 and it has been defined.
    • If an entry is forcefully removed from the cache (e.g., ledger deletion, or direct invalidation call that bypasses normal eviction), the entry is simply removed.
  • Configuration:
    • cacheEvictionByExpectedReadCount (boolean, default: true): Enables the new eviction strategy based on expected read count. When true, entries with expectedReadCount > 0 are less likely to be evicted by size-based eviction unless they also meet timestamp expiration.
    • managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes (Integer, default: 5): Maximum number of times the cache can extend the TTL of an entry that has remaining expected reads. Only takes effect when cacheEvictionByExpectedReadCount is enabled.
    • managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed (boolean, default: true): Controls whether recently accessed entries in the managed ledger cache should have their lifetime extended before cache eviction.

This strategy allows the cache to be more intelligent about retaining entries that have higher utility, especially in fan-out scenarios or with slightly lagging consumers in Key_Shared subscriptions. The replay queue is also used in Shared subscriptions when more entries are read than can be consumed by the available consumers. This strategy will also avoid cache misses in those scenarios.

Detailed Design

Design & Implementation Details

1. RangeCacheRemovalQueue

  • Structure:
    • removalQueue: org.jctools.queues.MpscUnboundedArrayQueue<RangeCacheEntryWrapper> to hold entries in insertion order in a FIFO queue.
    • EvictionPredicate determines the EvictionResult for each entry:
      • REMOVE: Entry is evicted.
      • REQUEUE: Entry is moved to the tail of the removalQueue (which extends the TTL by managedLedgerCacheEvictionTimeThresholdMillis).
      • STOP: Stops processing further entries in the current pass (e.g., for timestamp eviction when a non-expired entry is found, or for size eviction when enough space is freed).
      • MISSING: Entry was already removed from cache by other means.
    • evictLEntriesBeforeTimestamp(long timestampNanos): Uses a predicate to remove entries whose timestampNanos is older than the provided timestamp. It requeues entries if extendTTLOfRecentlyAccessed is enabled and the entry has been accessed since the last eviction check or if the entry has remaining expected reads and the requeue count is less than the configured maximum.
    • evictLeastAccessedEntries(long sizeToFree, long expirationTimestampNanos): Uses a predicate to remove the least accessed entries until the specified size is freed. It considers the extendTTLOfRecentlyAccessed and expected read count when determining eviction eligibility.

2. EntryImpl Modifications and EntryReadCountHandler

The values stored inside a RangeCache are EntryImpl instances (they implement ReferenceCountedEntry). EntryImpl has been modified to hold a reference to an EntryReadCountHandler instance which handles the state related to read count. EntryReadCountHandler is used as an abstraction and integration between RangeCacheRemovalQueue, EntryImpl and the Managed Ledger layer. The implementation of EntryReadCountHandler holds the state in a single volatile int field.

public interface EntryReadCountHandler {
    int getExpectedReadCount();

    void incrementExpectedReadCount();

    void markRead();

    default boolean hasExpectedReads() {
        return getExpectedReadCount() >= 1;
    }
}
  • EntryReadCountHandler interface:
    • int getExpectedReadCount() returns the current expected read count.
    • void incrementExpectedReadCount() increments the count.
    • void markRead(): Decrements the count.
  • EntryReadCountHandlerImpl class:
    • Uses AtomicIntegerFieldUpdater for expectedReadCount.

3. ActiveManagedCursorContainer and ActiveManagedCursorContainerImpl

To efficiently track the number of active cursors at or before a given position, a new ActiveManagedCursorContainer interface is introduced. The ActiveManagedCursorContainerImpl class implements tracking using a sorted linked list. JMH benchmarks show that this implementation is more efficient for the expected use cases, allowing O(1) access to the number of cursors at a given position. The ActiveManagedCursorContainerImpl is only used when the cacheEvictionByExpectedReadCount is enabled. In other cases, the previous ManagedCursorContainer implementation is used, although it is renamed to ManagedCursorContainerImpl so that the implementation can be decoupled from the interface. JMH benchmarks show that the new ActiveManagedCursorContainerImpl is more efficient than the previous ManagedCursorContainerImpl implementation for all use cases at the moment. It shows that there's a performance improvement opportunity for the ManagedCursorContainerImpl implementation, but it is not a priority for this PIP.

4. Integration with ManagedLedgerImpl

  • OpAddEntry:
    • When an entry is successfully added to a ledger:
      if (!(ml instanceof ShadowManagedLedgerImpl)) {
          int activeCursorCount = ml.getActiveCursors().size();
          if (activeCursorCount > 0) {
              int expectedReadCount = 0;
              if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
                  // For newly added entries, all active cursors are effectively "before" the added entry for future reads.
                  expectedReadCount = activeCursorCount;
              }
              EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount);
              entry.setDecreaseReadCountOnRelease(false); // Cache owns the primary read count handling now
              ml.entryCache.insert(entry);
              entry.release();
          }
      }
      
  • ManagedLedgerImpl.asyncReadEntry:
    • The expectedReadCount parameter (an IntSupplier) is passed down to RangeEntryCacheImpl.asyncReadEntry0.
    • The IntSupplier would resolve to opReadEntry.cursor.getNumberOfCursorsAtSamePositionOrBefore() when cacheEvictionByExpectedReadCount is true.
  • RangeEntryCacheImpl.readFromStorage:
    • Accepts an IntSupplier expectedReadCount.
    • When creating EntryImpl from LedgerEntry, it passes expectedReadCount.getAsInt() to EntryImpl.create.
  • EntryImpl.release():
    • A new flag decreaseReadCountOnRelease (default false) is added to EntryImpl.
    • When EntryImpl.create(Entry other) (copy constructor for dispatch) is called, this flag is set to true on the copy.
    • EntryImpl.beforeDeallocate(): if decreaseReadCountOnRelease is true and readCountHandler is not null, call readCountHandler.markRead(). This ensures that only when an entry copy created for dispatch is released, the read count on the original cached entry's handler is decremented.

5. RangeEntryCacheManagerImpl

  • doCacheEviction: Called by the global eviction scheduler.
    • Invokes evictionHandler.invalidateEntriesBeforeTimestampNanos. Would also have to take separate managedLedgerCacheEvictionTimeThresholdMillis and managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes settings into account.
    • Calls doEvictToWatermarkWhenOverThreshold() to ensure cache size is within limits.
  • triggerEvictionWhenNeeded(): Called after an entry is added.
    • If size > threshold, schedules doEvictToWatermarkWhenOverThreshold() on cacheEvictionExecutor.
  • doEvictToWatermarkWhenOverThreshold():
    • Calculates sizeToEvict.
    • Calls evictionHandler.evictEntries(sizeToEvict, expirationTimestampNanosForNonEvictableEntries).

Note About Entry Cache Size and How It's Related to Broker Cache Total Size

The calculation of the entry cache size is not accurate unless the managedLedgerCacheCopyEntries setting is set to true. This occurs because when the entry payload is read from BookKeeper or received from a Pulsar client publisher, it is “sliced” from an underlying buffer. The actual memory consumption can be higher when the other entries of the shared underlying buffer have been released and there is a single entry that retains the complete underlying buffer.

For entries received from a Pulsar client publisher, the buffer size is between 16kB to 1MB. This is currently not configurable and is set in code in the BrokerService.defaultServerBootstrap method.

        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
            new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));

The actual underlying buffer size will also depend on Netty's LengthFieldBasedFrameDecoder (extends ByteToMessageDecoder) logic, which will cumulate buffers by copying until the complete entry has been read. In that case, the underlying buffer size can be larger than 1MB. The cumulated buffer will be shared across multiple entries since entry buffers are sliced from the cumulated buffer.

For entries received from BookKeeper, it‘s similar. However, BookKeeper client defaults to Netty’s default channel config settings for RCVBUF_ALLOCATOR, which has a maximum buffer size of 64kB.

It would be useful to make the Pulsar broker side RCVBUF_ALLOCATOR parameters configurable so that the parameters could be tuned to a smaller size. It is very unlikely that the 1MB maximum size improves performance significantly. There is a performance tradeoff in the case where messages are large and buffers would have to be merged by copying in the LengthFieldBasedFrameDecoder. To avoid this overhead, it would be possible to make LengthFieldBasedFrameDecoder use composite buffers for merging by setting the cumulator property to ByteToMessageDecoder.COMPOSITE_CUMULATOR, however that could have a negative performance impact on code that expects that the complete payload is a single non-continuous buffer.

Since in many cases, the underlying buffer where the slice is taken for a single entry is shared with entries that have been published or retrieved together, it is also common that these entries will get evicted together. Making the RCVBUF_ALLOCATOR settings configurable in the Pulsar broker is a sufficient mitigation for the problem. In cases where shared buffers add a lot of overhead of consumed memory, it will be possible to reduce it by setting the maximum size for AdaptiveRecvByteBufAllocator to 64kB, with the tradeoff of possible unnecessary buffer copies for messages exceeding 64kB. In BookKeeper server, there are settings byteBufAllocatorSizeInitial, byteBufAllocatorSizeMin and byteBufAllocatorSizeMax to configure the AdaptiveRecvByteBufAllocator parameters. The naming of these parameters in BookKeeper isn't optimal since the settings are specifically about AdaptiveRecvByteBufAllocator parameters and not “byteBufAllocator” parameters.

The proposal would be to add these configuration parameters to broker.conf for controlling Broker's AdaptiveRecvByteBufAllocator parameters:

# Netty adaptive receive buffer allocator's minimum size
brokerAdaptiveRecvByteBufAllocatorMinimumSize=1024
# Netty adaptive receive buffer allocator's initial size
brokerAdaptiveRecvByteBufAllocatorInitialSize=16384
# Netty receive adaptive buffer allocator's maximum size
# Tune this value to a lower value to reduce overhead of the entries cached in the Broker cache due to shared underlying buffers
brokerAdaptiveRecvByteBufAllocatorMaximumSize=1048576

Public-facing Changes

Configuration

  • New Configuration: broker.conf

    • cacheEvictionByExpectedReadCount (boolean): Enables the new eviction strategy based on expected read count. When true, entries with expectedReadCount > 0 are less likely to be evicted by size-based eviction unless they also meet timestamp expiration. Default: true.
    • managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes (int): Maximum number of times the cache can extend the TTL of an entry that has remaining expected reads. Only takes effect when cacheEvictionByExpectedReadCount is enabled. This helps optimize cache efficiency for scenarios like:
      • Key_Shared subscription replays

      • Catch-up reads for lagging consumers

      • Consumers temporarily falling behind the tail

        Entries with remaining expected reads will have their TTL extended up to this many times before being eligible for eviction. The TTL will be extended by managedLedgerCacheEvictionTimeThresholdMillis each time. Default: 5.

    • managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed (boolean): Controls whether recently accessed entries in the managed ledger cache should have their lifetime extended before cache eviction.
      • When enabled:
        • During eviction check, if an entry has been accessed since the last check, its expiration time will be extended by managedLedgerCacheEvictionTimeThresholdMillis
        • Makes the cache behave like a Least Recently Used (LRU) cache by keeping frequently accessed entries longer
        • Helps optimize performance for frequently accessed entries while still allowing old unused entries to be evicted
        • Minimum eviction time is 2x managedLedgerCacheEvictionTimeThresholdMillis
      • When disabled:
        • Cache behaves more like a FIFO queue with time-based and size-based eviction
        • Minimum eviction time is managedLedgerCacheEvictionTimeThresholdMillis
      • Default: true.
    • managedLedgerContinueCachingAddedEntriesAfterLastActiveCursorLeavesMillis (long):
      • This setting configures the duration of continuing to cache added entries while there are no active cursors, when the last active cursor has left or immediately after initialization when the persistent topic and the managedledger gets loaded. This setting is ignored unless cacheEvictionByExpectedReadCount is enabled.
      • Default: The default value is 2 * managedLedgerCacheEvictionTimeThresholdMillis.
  • Modified Behavior of Existing Configurations:

    • managedLedgerCacheEvictionTimeThresholdMillis (long): Controls time-to-live (TTL) for entries in the managed ledger (broker) cache. The TTL can be extended in two ways:
      • When cacheEvictionByExpectedReadCount is enabled: TTL is extended for entries with remaining expected reads. The maximum number of extensions is controlled by managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes.
      • When cacheEvictionExtendTTLOfRecentlyAccessed is enabled: TTL is extended for entries accessed since the last expiration check.
      • Default: 1000.
    • managedLedgerCacheSizeMB: Still applies. The new size-based eviction will use the RangeCacheRemovalQueue.
    • cacheEvictionByMarkDeletedPosition: If cacheEvictionByExpectedReadCount is true, this setting's direct influence on preserving entries is diminished, as expectedReadCount provides more granular control. However, mark-delete position updates still occur. If cacheEvictionByExpectedReadCount is false, this setting functions as before (though the underlying eviction for “up to slowest reader” now also uses the central queue).
    • managedLedgerCursorBackloggedThreshold, managedLedgerMinimumBacklogCursorsForCaching, managedLedgerMinimumBacklogEntriesForCaching, managedLedgerMaxBacklogBetweenCursorsForCaching: These settings primarily determine whether a cursor is considered “active” for caching purposes (i.e., ManagedCursorImpl.isCacheReadEntry()).
      • If cacheEvictionByExpectedReadCount is true, these settings are ignored since they would conflict with the expected read count implementation and would not benefit the caching algorithm.
      • The current implementation using managedLedgerCursorBackloggedThreshold and related configuration parameters relies on periodic checks of checkBackloggedCursors and checkCursorsToCacheEntries, which occur once per minute during statistics updates.
        • This approach is insufficient for handling scenarios where a large number of consumers with backlogged subscriptions connect simultaneously. The calculation logic is also costly to perform for a large number of topics and cursors.
        • The cacheEvictionByExpectedReadCount solution eliminates the need for these settings since the efficient eviction algorithm can adapt to cases where far-behind backlogged cursors unnecessarily cause entries to be cached longer than necessary. The size-based eviction will efficiently remove entries in a prioritized manner, ensuring that entries with a positive expected read count are retained longer, while entries with no expected reads are evicted more aggressively.

Metrics & Monitoring

Existing broker cache metrics will continue to function, reflecting the behavior of the new eviction system and broker cache strategy.

Backward & Forward Compatibility

There are no compatibility concerns since the broker cache is handled at runtime, in a single broker.

Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

  • This PIP does not directly interact with or change geo-replication mechanisms. Cache behavior is local to each broker in each cluster. Compatibility considerations are the same as for a standalone broker.

Alternatives

  1. Simple LRU (Least Recently Used) for Global Cache:

    • A global LRU cache could be considered.
    • Reason for rejection: Standard LRU doesn‘t naturally account for Pulsar’s specific read patterns where an entry might be predictably read multiple times (e.g., by different consumers in a shared subscription after a backlog is cleared, or by Key_Shared consumers). The “expected read count” provides a more domain-specific heuristic. Moreover, managing a global LRU across all topics with high concurrency and varying entry sizes can be complex and may require significant locking or sophisticated concurrent data structures, potentially negating performance gains. The per-ledger cache structure with a central eviction coordinator is a less disruptive change.
  2. Priority Queue for RangeCacheRemovalQueue:

    • Instead of a simple FIFO queue, a priority queue ordered by expectedReadCount (ascending) and then timestamp (ascending) could be used.
    • Reason for rejection (for this PIP): While potentially more precise, managing priorities in a queue where expectedReadCount can change dynamically for entries already in the queue adds significant complexity. Updates would require O(log N) or O(N) operations depending on the queue implementation and how updates are handled. The current MPSC queue with a “stash” for non-evictable items offers a simpler, lower-overhead approach for the initial refactoring. Performance of MPSC queues is very high for enqueue/dequeue. The “stash” handles the “deprioritizing” of eviction for entries with positive read counts without complex queue reordering. This can be revisited if the stash mechanism proves insufficient.

Links