Retry topic is a topic that stores messages that have failed to be processed by the consumer, or the consumer can't process the message at the moment. The consumer can retry the message after a certain period of time. After several retries, the message will be moved to the dead letter topic.
There is a retry topic producer in consumer instance. Each time the consumer call consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS);
to retry the message after 3 seconds, the hidden producer will send the corresponding message to retry topic, and ack the message in original topic.
After several retries, the message will be sent to dead letter topic instead of retry topic.
Currently, we don‘t support configure the producer of retry/dead letter topic. But enable the chunk message feature and disable the batch configuration in hard code, which can’t handle many situations. For example, when the throughput of message of retry topic become considerable, the resource consumed by the un-batched messages is pretty large. There is no reason that we disable the batch message feature.
For better control for the retry/dead letter topic feature, we can support configuration for the producer of retry/dead letter topic.
DeadLetterProducerBuilderContext
and DeadLetterProducerBuilderCustomizer
interface:public interface DeadLetterProducerBuilderContext { /** * Returns the default name of topic for the dead letter or retry letter producer. This topic name is used * unless the DeadLetterProducerBuilderCustomizer overrides it. * * @return a {@code String} representing the input topic name */ String getDefaultTopicName(); /** * Returns the name of the input topic for which the dead letter or retry letter producer is being configured. * * @return a {@code String} representing the input topic name */ String getInputTopicName(); /** * Returns the name of the subscription for which the dead letter or retry letter producer is being configured. * * @return a {@code String} representing the subscription name */ String getInputTopicSubscriptionName(); /** * Returns the name of the consumer for which the dead letter or * retry letter producer is being configured. * @return a {@code String} representing the consumer name */ String getInputTopicConsumerName(); } public interface DeadLetterProducerBuilderCustomizer { /** * Customize the given producer builder with settings specific to the topic context provided. * * @param context the context containing information about the input topic and the subscription * @param producerBuilder the producer builder instance to be customized */ void customize(DeadLetterProducerBuilderContext context, ProducerBuilder<byte[]> producerBuilder); }
DeadLetterPolicy
:public class DeadLetterPolicy implements Serializable { /** * Customizer for configuring the producer builder for the retry letter topic. * * <p>This field holds a function that allows the caller to customize the producer builder * settings for the retry letter topic before the producer is created. The customization logic * can use the provided context (which includes input topic and subscription details) to adjust * configurations such as timeouts, batching, or message routing. */ private DeadLetterProducerBuilderCustomizer retryLetterProducerBuilderCustomizer; /** * Customizer for configuring the producer builder for the dead letter topic. * * <p>This field holds a function that allows the caller to customize the producer builder * settings for the dead letter topic before the producer is created. Using the provided context, * implementations can perform specific adjustments that ensure the dead letter queue operates * with the appropriate configurations tailored for handling undeliverable messages. */ private DeadLetterProducerBuilderCustomizer deadLetterProducerBuilderCustomizer; }
DeadLetterProducerBuilderCustomizer
to customize the producer of retry/dead letter topic like this:// enable batch DeadLetterProducerBuilderCustomizer producerBuilderCustomizer = (context, producerBuilder) -> { producerBuilder.enableBatching(true); }; Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName(subscriptionName) .enableRetry(true) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) .retryLetterProducerBuilderCustomizer(producerBuilderCustomizer) .build()) .subscribe();
Fully compatible.