PIP-315: Configurable max delay limit for delayed delivery

Background knowledge

Delayed message delivery is an important feature which allows a producer to specify that a message should be delivered/consumed at a later time. Currently the broker will save a delayed message without any check. The message's deliverAt time is checked when the broker dispatches messages to the Consumer. If a message has a deliverAt time, then it is added to the DelayedDeliveryTracker and will be delivered later when eligible.

Delayed message delivery is only available for persistent topics, and shared/key-shared subscription types.

Motivation

Currently there is no max delay limit so a producer can specify any delay when publishing a message.

This poses a few challenges:

  1. Producer may miscalculate/misconfigure a very large delay (ex. 1,000 day instead of 100 day delay)
  2. Pulsar administrators may want to limit the max allowed delay since unacked messages (ex. messages with a large delay) will be stored forever (unless TTL is configured)
  3. The configured delay may be greater than the configured TTL which means the delayed message may be deleted before the deliverAt time (before the consumer can process it)

Goals

The purpose of this PIP is to introduce an optional configuration to limit the max allowed delay for delayed delivery.

In Scope

  • Add broker configuration to limit the max allowed delay for delayed delivery
  • Configurable at broker/topic/namespace-level

High Level Design

We will add a configuration maxDeliveryDelayInMillis and if configured, the broker will check incoming delayed messages to see if the message's deliverAt time exceeds the configured limit. If it exceeds the limit, the broker will send an error back to the Producer.

Detailed Design

Design & Implementation Details

Broker Changes

A new maxDeliveryDelayInMillis config will be added to the broker which is initially defaulted to 0 (disabled). The default (disabled) behavior will match the current delayed delivery behavior (no limit on delivery delay).

# broker.conf
delayedDeliveryMaxDeliveryDelayInMillis=0

This field will also be added to the existing DelayedDeliveryPolicies interface to support topic & namespace-level configuration:

public interface DelayedDeliveryPolicies {
    long getMaxDeliveryDelayInMillis();
}

The max delivery delay check will occur in the broker's Producer class inside of checkAndStartPublish (same place as other checks such as isEncryptionEnabled).

We will give a ServerError.NotAllowedError error if all of the following are true:

  1. Sending to a persistent topic
  2. Topic has delayedDeliveryEnabled=true
  3. MessageMetadata deliver_at_time has been specified
  4. Topic has >0 value for maxDeliveryDelayInMillis
  5. deliver_at_time - publish_time > maxDeliveryDelayInMillis
// In org.apache.pulsar.broker.service.Producer#checkAndStartPublish
if (topic.isPersistent()) {
    PersistentTopic pTopic = (PersistentTopic) topic;
    if (pTopic.isDelayedDeliveryEnabled()) {
        headersAndPayload.markReaderIndex();
        MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
        headersAndPayload.resetReaderIndex();
        if (msgMetadata.hasDeliverAtTime()) {
            long maxDeliveryDelayInMillis = pTopic.getMaxDeliveryDelayInMillis();
            if (maxDeliveryDelayInMillis > 0
                    && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMillis) {
                cnx.execute(() -> {
                    cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
                            String.format("Exceeds max allowed delivery delay of %s milliseconds", maxDeliveryDelayInMillis));
                    cnx.completedSendOperation(false, headersAndPayload.readableBytes());
                });
                return false;
            }
        }
    }
}

Consumer Impact

The proposal does not involve any client changes, however it is important to note that setting a max delivery delay may impact the Consumer since the Consumer uses delayed delivery for retrying to the retry/dlq topic (ex. reconsumeLater API). So the max Consumer retry delay will be the same as the configured maxDeliveryDelayInMillis (if enabled).

A problem will occur if max delivery delay is configured but a Consumer uses a larger custom retry delay. In this scenario, the Consumer will actually get stuck redelivering the message as the publish to the retry topic will fail. For this scenario, a larger retry delay should be configured specifically for the Consumer's retry topic (or no delay limit should be used for retry topics).

A more elegant solution would require a protocol change (see Alternatives section below).

Public-facing Changes

Public API

The optional maxDeliveryDelayInMillis field will be added to the admin REST APIs for configuring topic/namespace policies:

  • POST /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery
  • POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery

And the corresponding GET APIs will show maxDeliveryDelayInMillis in the response:

  • GET /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery
  • GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery

Configuration

Broker will have a new config in broker.conf:

# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this max delay, then
# it will return an error to the producer.
# The default value is 0 which means there is no limit on the max delivery delay.
delayedDeliveryMaxDeliveryDelayInMillis=0

CLI

Both CmdTopics and CmdNamespaces will be updated to include this additional optional configuration.

Backward & Forward Compatibility

Revert

Reverting to a previous version will simply get rid of this config/limitation which is the previous behavior.

Upgrade

We will default the value to 0/disabled (no limitation), so this is a backwards compatible change and will not cause any functional change when upgrading to this feature/version. This feature will only be applied once the config is changed.

If configured, the maxDeliveryDelayInMillis limitation will affect:

  1. Producers who configure a longer max delivery delay (PIP-26: 2.4.0+)
  2. Consumers who configure a longer retry delay when using retry topic (PIP-58: 2.6.0+)

Alternatives

Add delayed delivery limit check at client-side

An alternative is to add the limit check to the client-side which requires a protocol change so that client Producer/Consumer will receive the delayed delivery configurations from the broker. The client Producer can then throw an exception if the caller provides a delay greater than the configured limit. The client Consumer can more elegantly handle when the retry publish delay is greater than the configured limit as it can default to using the limit instead of being stuck waiting for the limit to be increased.

This would still require the broker-side check as someone may be using a custom client. The main benefit is being able to elegantly handle the Consumer retry topic scenario.

If we were to make this protocol change, then it might make sense to also have the Producer check the delayedDeliveryEnabled config. If delayed delivery is disabled and the Producer tries to send a delayed message, then an exception is thrown to the caller (current behavior is the broker will just deliver the message instantly and no error is provided to the Producer so it can be misleading).

We would also need to add the client-side checks to other supported client libraries.

Since the scope of this alternative would be quite expansive, we may want to pursue this in a follow-up PIP instead of trying to address it all at once.

Links