Pulsar's delayed delivery feature allows producers to schedule messages to be delivered at a future time. To provide administrative control and prevent abuse, PIP-315 introduced a maxDeliveryDelayInMillis
policy. While this provides an important safeguard by setting an upper bound, it does not address all administrative needs. There is currently no way to enforce a specific, non-overridable delay for all messages on a topic.
Pulsar‘s delayed message delivery relies on tracking individual messages until their delivery time. This tracking state is persisted as part of the subscription’s cursor metadata, which records gaps or “holes” for unacknowledged messages. The Pulsar Managed Ledger has a hard limit on the number of disjoint unacknowledged ranges it can persist for a cursor, configured by managedLedgerMaxUnackedRangesToPersist
(defaulting to 10,000). A large volume of messages with widely varying delivery times can easily exhaust this capacity with each delayed message creating a separate unacknowledged hole.
Once the managedLedgerMaxUnackedRangesToPersist
limit is breached, the broker stops persisting the cursor‘s state and maintains it only in memory. This in-memory state is volatile and is completely lost if the broker restarts for any reason. Upon restart, the broker’s only source of truth is the last successfully persisted cursor position. This position does not account for the lost tracking information, forcing the broker to re-dispatch all messages that were being tracked in memory. This results in significant and difficult-to-predict message duplication for downstream consumers.
The lack of administrative controls on message delivery delays introduces critical risks to cluster stability and data integrity. This proposal aims to provide granular control at the topic and namespace levels to add a new fixed delay delivery configuration:
InMemoryDelayedDeliveryTracker
and its delayedDeliveryFixedDelayDetectionLookahead
feature (introduced in #16609, #17907), as the broker can avoid building a large, complex index of individual delayed messages.fixed-delivery-delay
policy, configurable at the namespace and topic levels, to enforce a mandatory delivery delay./delayedDelivery
admin API endpoints and pulsar-admin
commands to manage both the existing maxDeliveryDelayInMillis
and the new fixedDeliveryDelayInMillis
policies within the same policy group.fixed-delivery-delay
policy takes precedence over any client-specified delay and the existing max-delivery-delay
policy.This proposal will enhance the existing DelayedDeliveryPolicies
object to include a new fixedDeliveryDelayInMillis
field. This new policy will follow Pulsar's standard hierarchical model, allowing it to be set at the namespace level and overridden at the topic level. The core logic will be implemented with the following precedence:
fixed-delivery-delay
is set: The broker will ignore any deliverAt
time sent by the producer and will override it by calculating publish_time + fixed_delay
. The max-delivery-delay
policy is ignored.fixed-delivery-delay
is NOT set, but max-delivery-delay
is: The broker will validate the producer's requested deliverAt
time against the max-delivery-delay
policy, rejecting the message if it exceeds the limit.deliverAt
time, constrained only by the global broker-level setting.Data Model Changes:
fixedDeliveryDelayInMillis
field to the DelayedDeliveryPolicies.java
class. This class is already used for the existing maxDeliveryDelayInMillis
policy.HierarchyTopicPolicies.java
to resolve the effective fixedDeliveryDelayInMillis
for a topic, respecting the topic-over-namespace hierarchy.Enforcement Logic:
PersistentTopic.java
within the publishMessage
and publishTxnMessage
methods, right before the existing isExceedMaximumDeliveryDelay
check.MessageMetadata
of the incoming message before it is passed to the managed ledger for persistence.Method | Endpoint | Description |
---|---|---|
POST | /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery | Sets or updates the delayed delivery policies for the topic.To disable a policy, a field can be set to 0 . |
GET | /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery | Gets the configured delayed delivery policies for the topic which optionally include the fixedDeliveryDelayInMillis if configured |
Method | Endpoint | Description |
---|---|---|
POST | /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery | Sets or updates the delayed delivery policies for the namespace. To disable a policy, a field can be set to 0 . |
GET | /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery | Gets the configured delayed delivery policies for the namespace which optionally include the fixedDeliveryDelayInMillis if configured |
set-delayed-delivery
command in CmdTopicPolicies.java
and CmdNamespaces.java
will be updated with a new optional parameter: --fixed-delay
(or -fd
)./delayedDelivery
will be updated to accept and return the new fixedDeliveryDelayInMillis
field in their JSON payload.To provide visibility into the enforcement of the new message delay policies,we will introduce a new counter metric for maxDeliveryDelayInMillis
pulsar.broker.topic.messages.delayed.rejected
max-delivery-delay
). This helps administrators monitor when the policy is being enforced and identify misconfigured producers.pulsar.cluster
: The cluster where the broker is running.pulsar.namespace
: The namespace of the topic.pulsar.topic
: The specific topic where the message was rejected.{message}
(A standard unit for a count of messages).To provide visibility into the enforcement of the fix message delay policies, a new counter metric will be introduced for fixedDeliveryDelayInMillis
pulsar.broker.topic.messages.fixed.delay.overridden
deliverAt
time was overridden by the topic's fixed-delivery-delay
policy. This provides direct observability into how often the policy is being enforced against client-side settings.pulsar.cluster
: The cluster where the broker is running.pulsar.namespace
: The namespace of the topic.pulsar.topic
: The specific topic where the message was overridden.{message}
.Administrators can use the new pulsar.broker.topic.messages.delayed.rejected
metric to monitor the health and usage of the delayed delivery feature. A sudden spike in this metric could indicate:
max-delivery-delay
policy at the namespace or topic level is now affecting existing producers.A high or unexpected count for pulsar.broker.topic.messages.fixed.delay.overridden
indicates that producer applications are sending messages with a deliverAt
time to a topic that has a fixed-delivery-delay
policy. While the policy is being enforced correctly, this metric helps operators identify clients that may be misconfigured or unaware of the topic's enforced behavior.
Downgrading to a version without this feature is supported, but the new fixed-delivery-delay
policy will no longer be enforced.
deliverAt
time or enforcing the max-delivery-delay
policy.fixed-delivery-delay
policy on all relevant topics and namespaces before starting the rollback process. This can be done by setting the fixed delay value to 0
using the pulsar-admin ... set-delayed-delivery
command.The system shall guarantee that changes to delayed delivery policies are applied atomically across all active brokers. This ensures that the cluster does not operate in a mixed-policy state during either an upgrade or a downgrade procedure. During a geo-replication upgrade, ensure that all clusters are upgraded before relying on the consistency of the new delayed delivery policies. If downgrading a geo-replicated cluster, remove the new topic-level configurations before downgrading to prevent inconsistencies between clusters.