Apache Pulsar brokers maintain an in-memory entry cache to reduce latency and load on BookKeeper and tiered storage (S3) by serving frequently accessed message data directly from memory.
Key concepts:
ManagedLedgerImpl
: Manages storage for a single topic partition. Each ManagedLedgerImpl
instance has its own EntryCache
instance.EntryCache
: Stores message entries (payloads) in memory. The default implementation is RangeEntryCacheImpl
, which uses a RangeCache
internally.RangeCache
: A specialized cache that stores entries mapped by their Position
(ledgerId, entryId). It supports range-based operations for retrieval and expiration.EntryCacheManager
(RangeEntryCacheManagerImpl
): A global component that limits the total size of all entry caches in a broker. When the total size exceeds a threshold, it triggers eviction.managedLedgerCacheEvictionTimeThresholdMillis
). This is currently handled periodically by ManagedLedgerFactoryImpl
iterating over all managed ledgers.managedLedgerCacheSizeMB
). The current implementation resides in EntryCacheDefaultEvictionPolicy
, which selects a subset of larger caches and proportionally evicts entries from each entry cache to keep the total cache size under the limit.cacheEvictionByMarkDeletedPosition
.The broker cache serves various read patterns:
The current Pulsar broker entry cache implementation and its eviction mechanisms face several challenges that impact performance, efficiency, and predictability:
Inefficient and Flawed Size-Based Eviction: The EntryCacheDefaultEvictionPolicy
(the current default for size-based eviction) does not guarantee the removal of the oldest entries globally. It sorts individual EntryCache
instances by their size, selects a percentage of the largest caches, and then asks each of them to evict a proportional amount of data. This can lead to newer entries being evicted from large, active caches while older, less relevant entries remain in smaller or less active caches, resulting in suboptimal cache utilization and potentially lower hit rates.
Inefficient and Incorrect Timestamp-Based Eviction: The existing timestamp-based eviction mechanism, triggered by ManagedLedgerFactoryImpl
, has significant performance and correctness issues:
ManagedLedgerImpl
instances and their respective EntryCache
instances periodically (default every 10ms). In brokers with a large number of topics, this frequent and exhaustive iteration leads to high CPU utilization and memory pressure.RangeCache.evictLEntriesBeforeTimestamp
) often assumes entries within a single RangeCache
are primarily ordered by timestamp due to typical append-only workloads. This assumption breaks down with mixed read patterns like catch-up reads or when entries are inserted out of their natural position order (Key_shared subscription replay queue scenario), potentially leading to incorrect eviction decisions or inefficient scanning.Limited Cache Scope and Effectiveness for Diverse Read Patterns: The original RangeCache
was primarily designed with tailing reads in mind. While support for caching backlogged cursors and replay queue reads was added later, the eviction algorithms were not holistically updated to effectively manage mixed read patterns (tailing, catch-up, replays in Key_Shared). This can lead to:
Foundation for Advanced Caching Strategies Needed: The current cache architecture makes it difficult to implement more intelligent caching strategies that could further optimize for common Pulsar use cases, such as efficiently handling fan-out to multiple shared consumers or retaining entries expected to be read by several cursors.
Addressing these issues is crucial for improving broker performance, reducing operational costs (lower BookKeeper load), and providing a more robust caching layer that can adapt to diverse workloads.
The refactoring aims to make cache eviction more robust, performant, and predictable. The “expected read count” strategy is an attempt to make the cache more aware of Pulsar's specific consumption patterns.
EntryCacheDefaultEvictionPolicy
for size-based eviction with a centralized, insertion-order aware mechanism.RangeCacheRemovalQueue
that tracks all cached entries globally in approximate insertion order.cacheEvictionByExpectedReadCount
) to enable this strategy.managedLedgerCacheEvictionTimeThresholdMillis
during eviction check if it has been accessed since the last check.managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed
configuration option.The proposed solution involves two main components: a refactored eviction mechanism using a centralized removal queue and a new cache strategy based on expected read count and additionally LRU cache behavior.
RangeCacheRemovalQueue
A new component, RangeCacheRemovalQueue
, will be introduced at the EntryCacheManager
level.
RangeEntryCacheImpl
(the per-ledger cache), a lightweight wrapper for this entry (RangeCacheEntryWrapper
) is also added to this global RangeCacheRemovalQueue
. This queue maintains entries in their global insertion order (FIFO). This wrapper is already necessary in RangeEntryCacheImpl
to prevent consistency issues. The current internal wrapper class is refactored to a top-level class so that it can be used with the removal queue.ManagedLedgerFactoryImpl
's cacheEvictionExecutor
) will process the RangeCacheRemovalQueue
. It will iterate from the head of the queue, removing entries whose timestampNanos
are older than the cacheEvictionTimeThresholdNanos
. Since the queue is insertion-ordered, this process can often stop early once it encounters an entry that is not expired.EntryCacheManager
detects that the total cache size exceeds evictionTriggerThresholdPercent * maxSize
, it will trigger an eviction cycle. This cycle will also process the RangeCacheRemovalQueue
from the head, removing the oldest entries (regardless of which specific ledger they belong to) until the cache size is brought down to cacheEvictionWatermark * maxSize
.removalQueue
). Requeued entries are reconsidered in subsequent eviction passes or if they eventually expire by managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes
setting. This prevents premature eviction of entries that are likely to be read again soon. Similarly, there's a requeue mechanism for entries that have been accessed since the last eviction check and have managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed
set to true. If the entry has been accessed since the last eviction check, it will be requeued to the end of the removalQueue
and its TTL will be extended by managedLedgerCacheEvictionTimeThresholdMillis
. This is similar to LRU cache behavior.This centralized approach replaces the distributed, per-cache iteration for timestamp eviction and the less precise EntryCacheDefaultEvictionPolicy
for size eviction, leading to more globally optimal and efficient eviction decisions.
This FIFO queue-based solution is influenced by the S3FIFO cache eviction algorithm (algorithm explanation), although it is not an implementation of S3FIFO. It accomplishes a similar goal as S3FIFO, but for the managed ledger broker cache. The goal of S3FIFO is to quickly remove entries that are unlikely to be accessed in the future. Similarly, the PIP-430 broker cache removal queue aims to quickly and efficiently remove entries that would be unlikely to be accessed in the future. The reason why S3FIFO is not implemented in broker cache is that in broker caching, the “expected read count” concept is used to determine whether an entry is likely to be accessed in the future. This information is not available in generic cache implementations, and in the broker cache this information can be used to keep entries in the cache longer that are likely to be accessed in the future.
The S3FIFO algorithm explanation blog post states: “While the eviction algorithms so far have been centered around LRU, I believe modern eviction algorithms should be designed with FIFO queues.” This is one of the reasons why the PIP-430 broker cache removal queue is also based on a FIFO queue.
This new strategy aims to improve cache hit rates by retaining entries that are likely to be read by multiple active consumers/cursors.
EntryReadCountHandler
: Each cached entry (EntryImpl
) will be associated with an EntryReadCountHandlerImpl
. This handler maintains an expectedReadCount
(an atomic integer).OpAddEntry
), its expectedReadCount
is initialized to the number of active cursors.RangeEntryCacheImpl.readFromStorage
), the expectedReadCount
is initialized based on the current state of active cursors currently positioned before or at the entry being added. This information is sourced from ActiveManagedCursorContainer.getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor)
.expectedReadCount
is decremented (via EntryReadCountHandler.markRead()
).expectedReadCount
on a cached entry is incremented when the entry is added to the replay queue. This will de-prioritize the removal of the entry from the cache when size-based eviction is performed so that when the Key_Shared consumer is available to read the entry, it would more likely be available in the cache.RangeCacheRemovalQueue
's size-based eviction logic will consult EntryImpl.hasExpectedReads()
. This method returns true
if expectedReadCount > 0
and it has been defined.cacheEvictionByExpectedReadCount
(boolean, default: true
): Enables the new eviction strategy based on expected read count. When true, entries with expectedReadCount > 0
are less likely to be evicted by size-based eviction unless they also meet timestamp expiration.managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes
(Integer, default: 5
): Maximum number of times the cache can extend the TTL of an entry that has remaining expected reads. Only takes effect when cacheEvictionByExpectedReadCount
is enabled.managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed
(boolean, default: true
): Controls whether recently accessed entries in the managed ledger cache should have their lifetime extended before cache eviction.This strategy allows the cache to be more intelligent about retaining entries that have higher utility, especially in fan-out scenarios or with slightly lagging consumers in Key_Shared subscriptions. The replay queue is also used in Shared subscriptions when more entries are read than can be consumed by the available consumers. This strategy will also avoid cache misses in those scenarios.
RangeCacheRemovalQueue
removalQueue
: org.jctools.queues.MpscUnboundedArrayQueue<RangeCacheEntryWrapper>
to hold entries in insertion order in a FIFO queue.EvictionPredicate
determines the EvictionResult
for each entry:REMOVE
: Entry is evicted.REQUEUE
: Entry is moved to the tail of the removalQueue
(which extends the TTL by managedLedgerCacheEvictionTimeThresholdMillis
).STOP
: Stops processing further entries in the current pass (e.g., for timestamp eviction when a non-expired entry is found, or for size eviction when enough space is freed).MISSING
: Entry was already removed from cache by other means.evictLEntriesBeforeTimestamp(long timestampNanos)
: Uses a predicate to remove entries whose timestampNanos
is older than the provided timestamp. It requeues entries if extendTTLOfRecentlyAccessed
is enabled and the entry has been accessed since the last eviction check or if the entry has remaining expected reads and the requeue count is less than the configured maximum.evictLeastAccessedEntries(long sizeToFree, long expirationTimestampNanos)
: Uses a predicate to remove the least accessed entries until the specified size is freed. It considers the extendTTLOfRecentlyAccessed
and expected read count when determining eviction eligibility.EntryImpl
Modifications and EntryReadCountHandler
The values stored inside a RangeCache
are EntryImpl
instances (they implement ReferenceCountedEntry
). EntryImpl
has been modified to hold a reference to an EntryReadCountHandler
instance which handles the state related to read count. EntryReadCountHandler
is used as an abstraction and integration between RangeCacheRemovalQueue
, EntryImpl
and the Managed Ledger layer. The implementation of EntryReadCountHandler
holds the state in a single volatile int
field.
public interface EntryReadCountHandler { int getExpectedReadCount(); void incrementExpectedReadCount(); void markRead(); default boolean hasExpectedReads() { return getExpectedReadCount() >= 1; } }
EntryReadCountHandler
interface:int getExpectedReadCount()
returns the current expected read count.void incrementExpectedReadCount()
increments the count.void markRead()
: Decrements the count.EntryReadCountHandlerImpl
class:AtomicIntegerFieldUpdater
for expectedReadCount
.ActiveManagedCursorContainer
and ActiveManagedCursorContainerImpl
To efficiently track the number of active cursors at or before a given position, a new ActiveManagedCursorContainer
interface is introduced. The ActiveManagedCursorContainerImpl
class implements tracking using a sorted linked list. JMH benchmarks show that this implementation is more efficient for the expected use cases, allowing O(1) access to the number of cursors at a given position. The ActiveManagedCursorContainerImpl
is only used when the cacheEvictionByExpectedReadCount
is enabled. In other cases, the previous ManagedCursorContainer
implementation is used, although it is renamed to ManagedCursorContainerImpl
so that the implementation can be decoupled from the interface. JMH benchmarks show that the new ActiveManagedCursorContainerImpl
is more efficient than the previous ManagedCursorContainerImpl
implementation for all use cases at the moment. It shows that there's a performance improvement opportunity for the ManagedCursorContainerImpl
implementation, but it is not a priority for this PIP.
ManagedLedgerImpl
OpAddEntry
:if (!(ml instanceof ShadowManagedLedgerImpl)) { int activeCursorCount = ml.getActiveCursors().size(); if (activeCursorCount > 0) { int expectedReadCount = 0; if (ml.getConfig().isCacheEvictionByExpectedReadCount()) { // For newly added entries, all active cursors are effectively "before" the added entry for future reads. expectedReadCount = activeCursorCount; } EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount); entry.setDecreaseReadCountOnRelease(false); // Cache owns the primary read count handling now ml.entryCache.insert(entry); entry.release(); } }
ManagedLedgerImpl.asyncReadEntry
:expectedReadCount
parameter (an IntSupplier
) is passed down to RangeEntryCacheImpl.asyncReadEntry0
.IntSupplier
would resolve to opReadEntry.cursor.getNumberOfCursorsAtSamePositionOrBefore()
when cacheEvictionByExpectedReadCount
is true.RangeEntryCacheImpl.readFromStorage
:IntSupplier expectedReadCount
.EntryImpl
from LedgerEntry
, it passes expectedReadCount.getAsInt()
to EntryImpl.create
.EntryImpl.release()
:decreaseReadCountOnRelease
(default false
) is added to EntryImpl
.EntryImpl.create(Entry other)
(copy constructor for dispatch) is called, this flag is set to true
on the copy.EntryImpl.beforeDeallocate()
: if decreaseReadCountOnRelease
is true and readCountHandler
is not null, call readCountHandler.markRead()
. This ensures that only when an entry copy created for dispatch is released, the read count on the original cached entry's handler is decremented.RangeEntryCacheManagerImpl
doCacheEviction
: Called by the global eviction scheduler.evictionHandler.invalidateEntriesBeforeTimestampNanos
. Would also have to take separate managedLedgerCacheEvictionTimeThresholdMillis
and managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes
settings into account.doEvictToWatermarkWhenOverThreshold()
to ensure cache size is within limits.triggerEvictionWhenNeeded()
: Called after an entry is added.doEvictToWatermarkWhenOverThreshold()
on cacheEvictionExecutor
.doEvictToWatermarkWhenOverThreshold()
:sizeToEvict
.evictionHandler.evictEntries(sizeToEvict, expirationTimestampNanosForNonEvictableEntries)
.The calculation of the entry cache size is not accurate unless the managedLedgerCacheCopyEntries
setting is set to true
. This occurs because when the entry payload is read from BookKeeper or received from a Pulsar client publisher, it is “sliced” from an underlying buffer. The actual memory consumption can be higher when the other entries of the shared underlying buffer have been released and there is a single entry that retains the complete underlying buffer.
For entries received from a Pulsar client publisher, the buffer size is between 16kB to 1MB. This is currently not configurable and is set in code in the BrokerService.defaultServerBootstrap method.
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
The actual underlying buffer size will also depend on Netty's LengthFieldBasedFrameDecoder (extends ByteToMessageDecoder) logic, which will cumulate buffers by copying until the complete entry has been read. In that case, the underlying buffer size can be larger than 1MB. The cumulated buffer will be shared across multiple entries since entry buffers are sliced from the cumulated buffer.
For entries received from BookKeeper, it‘s similar. However, BookKeeper client defaults to Netty’s default channel config settings for RCVBUF_ALLOCATOR, which has a maximum buffer size of 64kB.
It would be useful to make the Pulsar broker side RCVBUF_ALLOCATOR
parameters configurable so that the parameters could be tuned to a smaller size. It is very unlikely that the 1MB maximum size improves performance significantly. There is a performance tradeoff in the case where messages are large and buffers would have to be merged by copying in the LengthFieldBasedFrameDecoder. To avoid this overhead, it would be possible to make LengthFieldBasedFrameDecoder use composite buffers for merging by setting the cumulator
property to ByteToMessageDecoder.COMPOSITE_CUMULATOR
, however that could have a negative performance impact on code that expects that the complete payload is a single non-continuous buffer.
Since in many cases, the underlying buffer where the slice is taken for a single entry is shared with entries that have been published or retrieved together, it is also common that these entries will get evicted together. Making the RCVBUF_ALLOCATOR settings configurable in the Pulsar broker is a sufficient mitigation for the problem. In cases where shared buffers add a lot of overhead of consumed memory, it will be possible to reduce it by setting the maximum size for AdaptiveRecvByteBufAllocator
to 64kB, with the tradeoff of possible unnecessary buffer copies for messages exceeding 64kB. In BookKeeper server, there are settings byteBufAllocatorSizeInitial
, byteBufAllocatorSizeMin
and byteBufAllocatorSizeMax
to configure the AdaptiveRecvByteBufAllocator
parameters. The naming of these parameters in BookKeeper isn't optimal since the settings are specifically about AdaptiveRecvByteBufAllocator
parameters and not “byteBufAllocator” parameters.
The proposal would be to add these configuration parameters to broker.conf
for controlling Broker's AdaptiveRecvByteBufAllocator
parameters:
# Netty adaptive receive buffer allocator's minimum size brokerAdaptiveRecvByteBufAllocatorMinimumSize=1024 # Netty adaptive receive buffer allocator's initial size brokerAdaptiveRecvByteBufAllocatorInitialSize=16384 # Netty receive adaptive buffer allocator's maximum size # Tune this value to a lower value to reduce overhead of the entries cached in the Broker cache due to shared underlying buffers brokerAdaptiveRecvByteBufAllocatorMaximumSize=1048576
New Configuration: broker.conf
cacheEvictionByExpectedReadCount
(boolean): Enables the new eviction strategy based on expected read count. When true, entries with expectedReadCount > 0
are less likely to be evicted by size-based eviction unless they also meet timestamp expiration. Default: true
.managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes
(int): Maximum number of times the cache can extend the TTL of an entry that has remaining expected reads. Only takes effect when cacheEvictionByExpectedReadCount
is enabled. This helps optimize cache efficiency for scenarios like:Key_Shared subscription replays
Catch-up reads for lagging consumers
Consumers temporarily falling behind the tail
Entries with remaining expected reads will have their TTL extended up to this many times before being eligible for eviction. The TTL will be extended by managedLedgerCacheEvictionTimeThresholdMillis
each time. Default: 5
.
managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed
(boolean): Controls whether recently accessed entries in the managed ledger cache should have their lifetime extended before cache eviction.managedLedgerCacheEvictionTimeThresholdMillis
managedLedgerCacheEvictionTimeThresholdMillis
managedLedgerCacheEvictionTimeThresholdMillis
true
.managedLedgerContinueCachingAddedEntriesAfterLastActiveCursorLeavesMillis
(long):cacheEvictionByExpectedReadCount
is enabled.managedLedgerCacheEvictionTimeThresholdMillis
.Modified Behavior of Existing Configurations:
managedLedgerCacheEvictionTimeThresholdMillis
(long): Controls time-to-live (TTL) for entries in the managed ledger (broker) cache. The TTL can be extended in two ways:cacheEvictionByExpectedReadCount
is enabled: TTL is extended for entries with remaining expected reads. The maximum number of extensions is controlled by managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes
.cacheEvictionExtendTTLOfRecentlyAccessed
is enabled: TTL is extended for entries accessed since the last expiration check.1000
.managedLedgerCacheSizeMB
: Still applies. The new size-based eviction will use the RangeCacheRemovalQueue
.cacheEvictionByMarkDeletedPosition
: If cacheEvictionByExpectedReadCount
is true
, this setting's direct influence on preserving entries is diminished, as expectedReadCount
provides more granular control. However, mark-delete position updates still occur. If cacheEvictionByExpectedReadCount
is false
, this setting functions as before (though the underlying eviction for “up to slowest reader” now also uses the central queue).managedLedgerCursorBackloggedThreshold
, managedLedgerMinimumBacklogCursorsForCaching
, managedLedgerMinimumBacklogEntriesForCaching
, managedLedgerMaxBacklogBetweenCursorsForCaching
: These settings primarily determine whether a cursor is considered “active” for caching purposes (i.e., ManagedCursorImpl.isCacheReadEntry()
).cacheEvictionByExpectedReadCount
is true, these settings are ignored since they would conflict with the expected read count implementation and would not benefit the caching algorithm.managedLedgerCursorBackloggedThreshold
and related configuration parameters relies on periodic checks of checkBackloggedCursors
and checkCursorsToCacheEntries
, which occur once per minute during statistics updates.cacheEvictionByExpectedReadCount
solution eliminates the need for these settings since the efficient eviction algorithm can adapt to cases where far-behind backlogged cursors unnecessarily cause entries to be cached longer than necessary. The size-based eviction will efficiently remove entries in a prioritized manner, ensuring that entries with a positive expected read count are retained longer, while entries with no expected reads are evicted more aggressively.Existing broker cache metrics will continue to function, reflecting the behavior of the new eviction system and broker cache strategy.
There are no compatibility concerns since the broker cache is handled at runtime, in a single broker.
Simple LRU (Least Recently Used) for Global Cache:
Priority Queue for RangeCacheRemovalQueue
:
expectedReadCount
(ascending) and then timestamp (ascending) could be used.expectedReadCount
can change dynamically for entries already in the queue adds significant complexity. Updates would require O(log N) or O(N) operations depending on the queue implementation and how updates are handled. The current MPSC queue with a “stash” for non-evictable items offers a simpler, lower-overhead approach for the initial refactoring. Performance of MPSC queues is very high for enqueue/dequeue. The “stash” handles the “deprioritizing” of eviction for entries with positive read counts without complex queue reordering. This can be revisited if the stash mechanism proves insufficient.