| |
| # PIP-409: support producer configuration for retry/dead letter topic producer |
| |
| # Background knowledge |
| |
| Retry topic is a topic that stores messages that have failed to be processed by the consumer, or the consumer can't process the message at the moment. |
| The consumer can retry the message after a certain period of time. After several retries, the message will be moved to the dead letter topic. |
| |
| There is a retry topic producer in consumer instance. Each time the consumer call `consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);` |
| to retry the message after 3 seconds, the hidden producer will send the corresponding message to retry topic, and ack the message in original topic. |
| |
| After several retries, the message will be sent to dead letter topic instead of retry topic. |
| |
| |
| # Motivation |
| |
| Currently, we don't support configure the producer of retry/dead letter topic. But enable the chunk message feature |
| and disable the batch configuration in hard code, which can't handle many situations. For example, when the throughput |
| of message of retry topic become considerable, the resource consumed by the un-batched messages is pretty large. |
| There is no reason that we disable the batch message feature. |
| |
| For better control for the retry/dead letter topic feature, we can support configuration for the producer of |
| retry/dead letter topic. |
| |
| # Goals |
| |
| ## In Scope |
| |
| - Support configuration for the producer of retry/dead letter topic. |
| |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| |
| - Add `DeadLetterProducerBuilderContext` and `DeadLetterProducerBuilderCustomizer` interface: |
| ```java |
| public interface DeadLetterProducerBuilderContext { |
| /** |
| * Returns the default name of topic for the dead letter or retry letter producer. This topic name is used |
| * unless the DeadLetterProducerBuilderCustomizer overrides it. |
| * |
| * @return a {@code String} representing the input topic name |
| */ |
| String getDefaultTopicName(); |
| |
| /** |
| * Returns the name of the input topic for which the dead letter or retry letter producer is being configured. |
| * |
| * @return a {@code String} representing the input topic name |
| */ |
| String getInputTopicName(); |
| |
| /** |
| * Returns the name of the subscription for which the dead letter or retry letter producer is being configured. |
| * |
| * @return a {@code String} representing the subscription name |
| */ |
| String getInputTopicSubscriptionName(); |
| |
| /** |
| * Returns the name of the consumer for which the dead letter or |
| * retry letter producer is being configured. |
| * @return a {@code String} representing the consumer name |
| */ |
| String getInputTopicConsumerName(); |
| } |
| |
| public interface DeadLetterProducerBuilderCustomizer { |
| /** |
| * Customize the given producer builder with settings specific to the topic context provided. |
| * |
| * @param context the context containing information about the input topic and the subscription |
| * @param producerBuilder the producer builder instance to be customized |
| */ |
| void customize(DeadLetterProducerBuilderContext context, ProducerBuilder<byte[]> producerBuilder); |
| } |
| ``` |
| |
| - Add two fields in `DeadLetterPolicy`: |
| ```java |
| public class DeadLetterPolicy implements Serializable { |
| /** |
| * Customizer for configuring the producer builder for the retry letter topic. |
| * |
| * <p>This field holds a function that allows the caller to customize the producer builder |
| * settings for the retry letter topic before the producer is created. The customization logic |
| * can use the provided context (which includes input topic and subscription details) to adjust |
| * configurations such as timeouts, batching, or message routing. |
| */ |
| private DeadLetterProducerBuilderCustomizer retryLetterProducerBuilderCustomizer; |
| /** |
| * Customizer for configuring the producer builder for the dead letter topic. |
| * |
| * <p>This field holds a function that allows the caller to customize the producer builder |
| * settings for the dead letter topic before the producer is created. Using the provided context, |
| * implementations can perform specific adjustments that ensure the dead letter queue operates |
| * with the appropriate configurations tailored for handling undeliverable messages. |
| */ |
| private DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer; |
| } |
| ``` |
| |
| - use the `DeadLetterProducerBuilderCustomizer` to customize the producer of retry/dead letter topic like this: |
| ```java |
| // enable batch |
| DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> { |
| producerBuilder.enableBatching(true); |
| }; |
| Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) |
| .topic(topic) |
| .subscriptionName(subscriptionName) |
| .enableRetry(true) |
| .deadLetterPolicy(DeadLetterPolicy.builder() |
| .maxRedeliverCount(maxRedeliveryCount) |
| .retryLetterProducerBuilderCustomizer(producerBuilderCustomizer) |
| .build()) |
| .subscribe(); |
| ``` |
| |
| |
| # Backward & Forward Compatibility |
| |
| Fully compatible. |
| |
| # Links |
| |
| <!-- |
| Updated afterwards |
| --> |
| * Mailing List discussion thread: https://lists.apache.org/thread/h6jjf9wn2h4zmpjw5zjtnl5ds1r4nknq |
| * Mailing List voting thread: https://lists.apache.org/thread/6wgxovk0f72d10zdgsnto6bkh6pwfzj1 |