Apache Pulsar brokers provide commands for clients to discover topics within a namespace and watch for topic updates. These commands are critical for client operations but currently lack memory limits and flow control mechanisms, creating potential memory and stability issues at scale.
Pulsar brokers already implement comprehensive memory management for most operations through several key configurations:
Message Publishing Memory Limits:
maxMessagePublishBufferSizeInMB
(default: 50% direct memory): Limits memory used for buffering messages during publishing, providing backpressure when producers exceed broker capacityManaged Ledger Memory Limits:
managedLedgerMaxReadsInFlightSizeInMB
(default: 0, disabled): Controls memory allocation for concurrent read operations from BookKeeper, preventing excessive memory usage during high read loads. This limit extends to cover buffers that were read from BookKeeper and are waiting in channel outbound buffers to be written to client sockets.managedLedgerCacheSizeMB
(default: 20% of direct memory): Limits cache memory for recently read ledger entries, ensuring predictable memory usage for read caching. This limit extends to cover buffers that were read from the cache and are waiting in channel outbound buffers to be written to client sockets.Additional Memory Controls:
maxConcurrentLookupRequest
(default: 50000): Limits concurrent topic lookup requests. The unit of this limit is the number of requests; it is not expressed in memory size.maxConcurrentTopicLoadRequest
(default: 5000): Controls concurrent topic loading operations. The unit of this limit is the number of requests; it is not expressed in memory size.These existing limits effectively bound memory usage for message handling, storage operations, and most broker functions. However, there is a significant gap in memory management for topic discovery operations.
Major unbounded memory allocation in Pulsar brokers occurs during topic listing operations:
CommandGetTopicsOfNamespace
/ CommandGetTopicsOfNamespaceResponse
CommandWatchTopicList
/ CommandWatchTopicListSuccess
& CommandWatchTopicUpdate
These operations can allocate arbitrary amounts of memory based on namespace size, with no limits or backpressure mechanisms.
Topic Discovery Commands:
CommandGetTopicsOfNamespace
: Binary protocol command that retrieves all topics within a namespaceCommandGetTopicsOfNamespaceResponse
: Response containing the list of topicsCommandWatchTopicList
: Command to establish a watch for topic list changesCommandWatchTopicListSuccess
: Initial response confirming watch establishment and returning current topic listCommandWatchTopicUpdate
: Notifications sent when topics are added or removedCurrent Implementation Flow:
The getTopicsOfNamespace
request follows this path:
CommandGetTopicsOfNamespace
via binary protocolServerCnx.handleGetTopicsOfNamespace()
ProxyConnection.handleGetTopicsOfNamespace()
NamespaceService.getListOfUserTopics()
orchestrates:TopicResources
TopicList.filterSystemTopic()
inProgressQueryUserTopics
to prevent duplicate queriesUnlike other broker operations that have memory limits, topic listing operations create unbounded memory allocation scenarios:
Memory Allocation Points:
Scale Impact:
The lack of memory limits for topic listing commands creates the final significant gap in Pulsar's otherwise comprehensive memory management system:
Memory Management Consistency: While all other broker operations have memory limits and backpressure mechanisms, topic listing operations remain unbounded, creating an inconsistent and unpredictable memory profile.
Broker Memory Exhaustion Risk: Large clusters can trigger OutOfMemoryErrors when multiple clients simultaneously request topic lists, causing broker crashes and service disruption despite other memory controls being in place.
Proxy Memory Exhaustion Risk: Proxies are also impacted for CommandGetTopicsOfNamespace
since the request is forwarded to a broker and the response is deserialized and reserialized without limits.
Unpredictable Resource Usage: Operators cannot reliably predict or limit total broker or proxy memory consumption due to this unbounded allocation path, undermining capacity planning and resource management.
Performance Degradation: Even without OOM, large topic list operations cause GC pressure and latency spikes affecting all broker operations, counteracting the stability provided by other memory limits.
CommandGetTopicsOfNamespace
and CommandWatchTopicList
commandsThe solution introduces an AsyncDualMemoryLimiter
that acts as a memory-aware semaphore for topic listing operations, completing Pulsar's memory management framework:
Memory Tracking: Before processing requests or sending responses, the system estimates memory requirements and acquires permits from the limiter. When the permit cannot be estimated and allocated before the operation, an initial permit is acquired and updated before continuing with handling. Although not optimal, this will effectively limit memory usage across the broker.
Dual Memory Pools: Separate tracking for heap memory (topic list assembly) and direct memory (network buffers) with independent limits, since topic listing operations use both types of memory.
Asynchronous Backpressure: When memory limits are reached, requests queue with configurable timeouts rather than failing immediately, providing graceful degradation similar to managedLedgerMaxReadsInFlightSizeInMB
behavior. This type of solution is helpful since rejecting requests and requiring clients to retry can cause more load on the system and would cause unfair queueing. When the queue is completely full, requests are rejected.
Graceful Degradation: The system continues processing within memory limits rather than crashing, with clear metrics indicating when memory-based throttling occurs.
Release Guarantees: Memory permits are released after response transmission completes or on request failure, preventing memory leaks and ensuring accurate memory tracking.
This is an abstraction for a generic asynchronous semaphore. The memory limiter implementation will use this abstraction to implement separate limiters for heap and direct memory.
public interface AsyncSemaphore { /** * Acquire permits from the semaphore. * Returned future completes when permits are available. * It will complete exceptionally with AsyncSemaphorePermitAcquireTimeoutException on timeout * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException when queue full * @return CompletableFuture that completes with permit when available */ CompletableFuture<AsyncSemaphorePermitResult> acquire(long permits); /** * Acquire or release permits for previously acquired permits by updating the permits. * Returns a future that completes when permits are available. * It will complete exceptionally with AsyncSemaphorePermitAcquireTimeoutException on timeout * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException when queue full * @return CompletableFuture that completes with permit when available */ CompletableFuture<AsyncSemaphorePermit> update(AsyncSemaphorePermit permit, long newPermits); /** * Release previously acquired permit. * Must be called to prevent memory permit leaks. */ void release(AsyncSemaphorePermit permit); }
This is an abstraction for an asynchronous memory semaphore that tracks separate limits for heap and direct memory.
public interface AsyncDualMemoryLimiter { enum LimitType { HEAP_MEMORY, // For heap memory allocation DIRECT_MEMORY // For direct memory allocation } /** * Acquire permits for the specified memory size. * Returned future completes when memory permits are available. * It will complete exceptionally with AsyncSemaphorePermitAcquireTimeoutException on timeout * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException when queue full * @return CompletableFuture that completes with permit when available */ CompletableFuture<AsyncDualMemoryLimiterPermit> acquire(long memorySize, LimitType limitType); /** * Acquire or release permits for previously acquired permits by updating the requested memory size. * Returns a future that completes when permits are available. * It will complete exceptionally with AsyncSemaphorePermitAcquireTimeoutException on timeout * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException when queue full * @return CompletableFuture that completes with permit when available */ CompletableFuture<AsyncDualMemoryLimiterPermit> update(AsyncDualMemoryLimiterPermit permit, long newMemorySize); /** * Release previously acquired permit. * Must be called to prevent memory permit leaks. */ void release(AsyncDualMemoryLimiterPermit permit); }
1. Heap Memory Limiting (Post-Retrieval)
In ServerCnx.handleGetTopicsOfNamespace
:
// Acquire a fixed amount of permits initially since it's not known how much memory will be used // This will ensure that the operation continues only after it has the initial permits // It would be possible to use statistics for initial estimate, but this is simpler and sufficient maxTopicListInFlightLimiter.acquire(INITIAL_SIZE, AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY) .thenCompose(initialPermit -> { getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode) .thenCompose(topics -> { // Estimate memory after retrieval and update the permits to reflect the actual size long estimatedSize = topics.stream().mapToInt(String::length).sum(); return maxTopicListInFlightLimiter .update(initialPermit, estimatedSize) .thenApply(permit -> Pair.of(topics, permit)); }) .thenAccept(topicsAndPermit -> { try { // Process and send response ... } finally { maxTopicListInFlightLimiter.release(topicsAndPermit.getRight()); } }); ... // For exceptional paths, initialPermit would need to be released
2. Direct Memory Limiting (Pre-Serialization)
Modified CommandSender
implementation:
@Override public void sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash, boolean filtered, boolean changed, long requestId) { BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash, filtered, changed, requestId); safeIntercept(command, cnx); acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command); } private void acquireMaxTopicListInFlightPermitsAndWriteAndFlush(BaseCommand command) { // Calculate serialized size before acquiring permits int serializedSize = command.getSerializedSize(); // Acquire permits maxTopicListInFlightLimiter.acquire(serializedSize, AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY) .thenAcceptAsync(permits -> { try { // Serialize the response ByteBuf outBuf = Commands.serializeWithPrecalculatedSerializedSize(command, serializedSize); // Write the response cnx.ctx().writeAndFlush(outBuf).addListener(future -> { // Release permits after the response has been written to the socket maxTopicListInFlightLimiter.release(permits); }); } catch (Exception e) { // Return permits if an exception occurs before writeAndFlush is called successfully maxTopicListInFlightLimiter.release(permits); throw e; } }, cnx.ctx().executor()); }
3. Watch Command Memory Control
Similar memory limiting patterns apply to watch commands:
public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics) { BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics); acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command); } public void sendWatchTopicListUpdate(long watcherId, List<String> newTopics, List<String> deletedTopics, String topicsHash) { BaseCommand command = Commands.newWatchTopicUpdate(watcherId, newTopics, deletedTopics, topicsHash); acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command); }
4. Proxy Reading Memory Control
On the Pulsar Proxy side, the problem is slightly different. The problem occurs when the proxy receives a CommandGetTopicsOfNamespace
command, forwards it to a broker, and receives a response. The proxy needs to deserialize and serialize the response before sending it to the client. Memory is allocated for both deserialization and serialization.
Solving this requires a slight modification to PulsarDecoder.
In PulsarDecoder.channelRead
, it would be necessary to record the size of the incoming message:
// Get a buffer that contains the full frame ByteBuf buffer = (ByteBuf) msg; try { // De-serialize the command int cmdSize = (int) buffer.readUnsignedInt(); cmd.parseFrom(buffer, cmdSize);
It could be modified to store the cmdSize
in a field instead of a local variable:
protected int cmdSize; ... // Get a buffer that contains the full frame ByteBuf buffer = (ByteBuf) msg; try { // De-serialize the command cmdSize = (int) buffer.readUnsignedInt(); cmd.parseFrom(buffer, cmdSize);
Changes would be needed to be able to use this serialized size so that it doesn't need to be re-calculated. cmdSize
would be added as a field to GetTopicsResult
:
@Override protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse success) { checkArgument(state == State.Ready); long requestId = success.getRequestId(); List<String> topics = success.getTopicsList(); if (log.isDebugEnabled()) { log.debug("{} Received get topics of namespace success response from server: {} - topics.size: {}", ctx.channel(), success.getRequestId(), topics.size()); } CompletableFuture<GetTopicsResult> requestFuture = (CompletableFuture<GetTopicsResult>) pendingRequests.remove(requestId); if (requestFuture != null) { requestFuture.complete(new GetTopicsResult(topics, success.hasTopicsHash() ? success.getTopicsHash() : null, success.isFiltered(), success.isChanged(), // Store cmdSize in the GetTopicsResult <---- cmdSize)); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } }
The limiter would be integrated into LookupProxyHandler
's performGetTopicsOfNamespace
in this way:
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; command = Commands.newGetTopicsOfNamespaceRequest(namespaceName, requestId, mode, topicsPattern, topicsHash); // Acquire a fixed amount of permits initially since it's not known how much memory will be used // This will ensure that the operation continues only after it has the initial permits // It would be possible to use statistics for initial estimate, but this is simpler and sufficient maxTopicListInFlightLimiter.acquire(INITIAL_SIZE, AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY) .thenCompose(initialPermit -> { clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> { if (t != null) { maxTopicListInFlightLimiter.release(initialPermit); log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName, t.getMessage()); writeAndFlush( Commands.newError(clientRequestId, getServerError(t), t.getMessage())); } else { // Update the initial permits to reflect the actual size of the response maxTopicListInFlightLimiter.update(initialPermit, r.getSerializedSize()) .thenCompose(heapPermit -> { // Acquire a direct memory permit for serialization maxTopicListInFlightLimiter.acquire(r.getSerializedSize(), AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY) .thenAccept(directPermit -> { proxyConnection.ctx().writeAndFlush( Commands.newGetTopicsOfNamespaceResponse(r.getNonPartitionedOrPartitionTopics(), r.getTopicsHash(), r.isFiltered(), r.isChanged(), clientRequestId) ).addListener(future -> { // Release permits after the response has been written to the socket maxTopicListInFlightLimiter.release(heapPermit); maxTopicListInFlightLimiter.release(directPermit); }); }) // Add exception handling for releasing directPermit }); // Add exception handling for releasing heapPermit } }); }); proxyConnection.getConnectionPool().releaseConnection(clientCnx); }).exceptionally(ex -> {
broker.conf/proxy.conf additions to complete the memory management configuration set:
# Maximum heap memory for inflight topic list operations (MB) # Default: 100 MB (supports ~1M topic names assuming 100 bytes each) maxTopicListInFlightHeapMemSizeMB=100 # Maximum direct memory for inflight topic list responses (MB) # Default: 100 MB (network buffers for serialized responses) maxTopicListInFlightDirectMemSizeMB=100 # Timeout for acquiring heap memory permits (milliseconds) # Default: 25000 (25 seconds) maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000 # Maximum queue size for heap memory permit requests # Default: 1000 (prevent unbounded queueing) maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=1000 # Timeout for acquiring direct memory permits (milliseconds) # Default: 25000 (25 seconds) maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000 # Maximum queue size for direct memory permit requests # Default: 1000 (prevent unbounded queueing) maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=1000
New metrics under pulsar_broker_topic_list_
/pulsar_proxy_topic_list
prefix, complementing existing memory metrics:
Metric Name | Type | Description | Labels |
---|---|---|---|
heap_memory_used_bytes | Gauge | Current heap memory used by topic listings | cluster |
heap_memory_limit_bytes | Gauge | Configured heap memory limit | cluster |
direct_memory_used_bytes | Gauge | Current direct memory used by topic listings | cluster |
direct_memory_limit_bytes | Gauge | Configured direct memory limit | cluster |
heap_queue_size | Gauge | Current heap memory limiter queue size | cluster |
heap_queue_max_size | Gauge | Maximum heap memory limiter queue size | cluster |
direct_queue_size | Gauge | Current direct memory limiter queue size | cluster |
direct_queue_max_size | Gauge | Maximum direct memory limiter queue size | cluster |
heap_wait_time_ms | Histogram | Wait time for heap memory permits | cluster |
direct_wait_time_ms | Histogram | Wait time for direct memory permits | cluster |
heap_timeout_total | Counter | Total heap memory permit timeouts | cluster |
direct_timeout_total | Counter | Total direct memory permit timeouts | cluster |
No changes to REST API.
No protocol changes. Existing commands continue to work with added server-side memory limits and backpressure.
Operators should monitor the following metrics alongside existing memory management metrics and set up alerts:
Memory Utilization Alert:
heap_memory_used_bytes / heap_memory_limit_bytes > 0.8
Queue Saturation Alert:
heap_queue_size / heap_queue_max_size > 0.9
Timeout Rate Alert:
rate(heap_timeout_total[5m]) > 1
P99 Wait Time Alert:
heap_wait_time_ms{quantile="0.99"} > 10000
These alerts should be configured alongside existing memory alerts for managedLedgerCacheSizeMB
, maxMessagePublishBufferSizeInMB
, and other memory limits to provide comprehensive memory management visibility.
The memory limiting mechanism introduces new denial-of-service protections:
Resource Exhaustion Protection: The limits prevent bad clients from triggering OOM by requesting large topic lists repeatedly, completing the broker's defense against memory-based attacks.
Fair Queueing: The queue size limits prevent bad clients from monopolizing memory permits and blocking legitimate requests.
Multi-tenancy Isolation: Consider per-tenant memory limits in future iterations to prevent one tenant from consuming all available topic listing memory permits, similar to how other memory limits could benefit from tenant isolation.
# Start with high limits to understand current usage pulsar-admin brokers update-dynamic-config --config maxTopicListInFlightHeapMemSizeMB --value 512 pulsar-admin brokers update-dynamic-config --config maxTopicListInFlightDirectMemSizeMB --value 512 # Monitor metrics and adjust downward based on actual usage patterns pulsar-admin brokers update-dynamic-config --config maxTopicListInFlightHeapMemSizeMB --value 200 pulsar-admin brokers update-dynamic-config --config maxTopicListInFlightDirectMemSizeMB --value 200 pulsar-admin brokers update-dynamic-config --config maxTopicListInFlightHeapMemSizeMB --value 100 pulsar-admin brokers update-dynamic-config --config maxTopicListInFlightDirectMemSizeMB --value 100
managedLedgerCacheSizeMB
or similar