blob: 7a707486c226d7711edb9d40cda568e5aab9eca4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.reactive.client.api;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import reactor.core.scheduler.Scheduler;
/**
* Builder interface for {@link ReactiveMessageConsumer}.
*
* @param <T> the message payload type
*/
public interface ReactiveMessageConsumerBuilder<T> {
/**
* Apply a consumer spec to configure the consumer.
* @param consumerSpec the consumer spec to apply
* @return the consumer builder instance
*/
default ReactiveMessageConsumerBuilder<T> applySpec(ReactiveMessageConsumerSpec consumerSpec) {
getMutableSpec().applySpec(consumerSpec);
return this;
}
/**
* Converts this builder to an immutable reactive consumer spec.
* @return the reactive consumer spec.
*/
default ReactiveMessageConsumerSpec toImmutableSpec() {
return new ImmutableReactiveMessageConsumerSpec(getMutableSpec());
}
/**
* Converts this builder to a mutable reactive consumer spec.
* @return the reactive consumer spec.
*/
MutableReactiveMessageConsumerSpec getMutableSpec();
/**
* Creates and returns a copy of this reactive consumer builder.
* @return the cloned reactive reader builder
*/
ReactiveMessageConsumerBuilder<T> clone();
/**
* Adds a topic this consumer will subscribe on.
* @param topicName a topic that the consumer will subscribe on
* @return the consumer builder instance
* @see ConsumerBuilder#topic(String...)
*/
default ReactiveMessageConsumerBuilder<T> topic(String topicName) {
getMutableSpec().getTopicNames().add(topicName);
return this;
}
/**
* Adds topics this consumer will subscribe on.
* @param topicNames a set of topic that the consumer will subscribe on
* @return the consumer builder instance
* @see ConsumerBuilder#topic(String...)
*/
default ReactiveMessageConsumerBuilder<T> topic(String... topicNames) {
for (String topicName : topicNames) {
getMutableSpec().getTopicNames().add(topicName);
}
return this;
}
/**
* Sets the topics this consumer will subscribe on.
* @param topicNames a set of topic that the consumer will subscribe on
* @return the consumer builder instance
* @see ConsumerBuilder#topics(List)
*/
default ReactiveMessageConsumerBuilder<T> topics(List<String> topicNames) {
getMutableSpec().setTopicNames(topicNames);
return this;
}
/**
* Sets a pattern for topics that this consumer will subscribe on.
*
* <p>
* The pattern will be applied to subscribe to all the topics, within a single
* namespace, that will match the pattern.
*
* <p>
* The consumer will automatically subscribe to topics created after itself.
* @param topicsPattern a regular expression to select a list of topics to subscribe
* to
* @return the consumer builder instance
* @see ConsumerBuilder#topicsPattern(Pattern)
*/
default ReactiveMessageConsumerBuilder<T> topicsPattern(Pattern topicsPattern) {
getMutableSpec().setTopicsPattern(topicsPattern);
return this;
}
/**
* Sets to which topics this consumer should be subscribed to - Persistent,
* Mon-Persistent, or both. Only used with pattern subscriptions.
* @param topicsPatternSubscriptionMode pattern subscription mode
* @return the consumer builder instance
* @see ConsumerBuilder#subscriptionTopicsMode(RegexSubscriptionMode)
*/
default ReactiveMessageConsumerBuilder<T> topicsPatternSubscriptionMode(
RegexSubscriptionMode topicsPatternSubscriptionMode) {
getMutableSpec().setTopicsPatternSubscriptionMode(topicsPatternSubscriptionMode);
return this;
}
/**
* Sets the topics auto discovery period when using a pattern for topics consumer.
* @param topicsPatternAutoDiscoveryPeriod duration between checks for new topics
* matching pattern set with {@link #topicsPattern(Pattern)}
* @return the consumer builder instance
* @see ConsumerBuilder#patternAutoDiscoveryPeriod(int, TimeUnit)
*/
default ReactiveMessageConsumerBuilder<T> topicsPatternAutoDiscoveryPeriod(
Duration topicsPatternAutoDiscoveryPeriod) {
getMutableSpec().setTopicsPatternAutoDiscoveryPeriod(topicsPatternAutoDiscoveryPeriod);
return this;
}
/**
* Sets the subscription name for this consumer.
* <p>
* This argument is required when constructing the consumer.
* @param subscriptionName the name of the subscription that this consumer should
* attach to
* @return the consumer builder instance
* @see ConsumerBuilder#subscriptionName(String)
*/
default ReactiveMessageConsumerBuilder<T> subscriptionName(String subscriptionName) {
getMutableSpec().setSubscriptionName(subscriptionName);
return this;
}
/**
* Sets the subscription mode to be used when subscribing to the topic.
*
* <p>
* Options are:
* <ul>
* <li>{@link SubscriptionMode#Durable} (Default)</li>
* <li>{@link SubscriptionMode#NonDurable}</li>
* </ul>
* @param subscriptionMode the subscription mode value
* @return the consumer builder instance
* @see ConsumerBuilder#subscriptionMode(SubscriptionMode)
*/
default ReactiveMessageConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode) {
getMutableSpec().setSubscriptionMode(subscriptionMode);
return this;
}
/**
* Sets the subscription type to be used when subscribing to the topic.
*
* <p>
* Options are:
* <ul>
* <li>{@link SubscriptionType#Exclusive} (Default)</li>
* <li>{@link SubscriptionType#Failover}</li>
* <li>{@link SubscriptionType#Shared}</li>
* </ul>
* @param subscriptionType the subscription type value
* @return the consumer builder instance
* @see ConsumerBuilder#subscriptionType(SubscriptionType)
*/
default ReactiveMessageConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType) {
getMutableSpec().setSubscriptionType(subscriptionType);
return this;
}
/**
* Sets the initial position of the subscription for the consumer.
* @param subscriptionInitialPosition the position where to initialize a newly created
* subscription
* @return the consumer builder instance
* @see ConsumerBuilder#subscriptionInitialPosition(SubscriptionInitialPosition)
*/
default ReactiveMessageConsumerBuilder<T> subscriptionInitialPosition(
SubscriptionInitialPosition subscriptionInitialPosition) {
getMutableSpec().setSubscriptionInitialPosition(subscriptionInitialPosition);
return this;
}
/**
* Sets the KeyShared subscription policy for the consumer.
*
* <p>
* By default, KeyShared subscription uses auto split hash range to maintain
* consumers. If you want to set a different KeyShared policy, you can set it like
* this:
*
* <pre>
* client.messageConsumer(Schema.BYTES)
* .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 10)))
* .build();
* </pre> For details about sticky hash range policy, please see
* {@link KeySharedPolicy.KeySharedPolicySticky}.
*
* <p>
* Or <pre>
* client.messageConsumer(Schema.BYTES)
* .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
* .build();
* </pre> For details about auto split hash range policy, please see
* {@link KeySharedPolicy.KeySharedPolicyAutoSplit}.
* @param keySharedPolicy the KeyShared policy to set
* @return the consumer builder instance
* @see ConsumerBuilder#keySharedPolicy(KeySharedPolicy)
*/
default ReactiveMessageConsumerBuilder<T> keySharedPolicy(KeySharedPolicy keySharedPolicy) {
getMutableSpec().setKeySharedPolicy(keySharedPolicy);
return this;
}
/**
* Sets whether the subscription shall be replicated.
* @param replicateSubscriptionState whether the subscription shall be replicated
* @return the consumer builder instance
* @see ConsumerBuilder#replicateSubscriptionState(boolean)
*/
default ReactiveMessageConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState) {
getMutableSpec().setReplicateSubscriptionState(replicateSubscriptionState);
return this;
}
/**
* Sets the subscription properties for this subscription. Properties are immutable,
* and consumers under the same subscription will fail to create a subscription if
* they use different properties.
* @param subscriptionProperties the subscription properties to set
* @return the consumer builder instance
* @see ConsumerBuilder#subscriptionProperties(Map)
*/
default ReactiveMessageConsumerBuilder<T> subscriptionProperties(Map<String, String> subscriptionProperties) {
getMutableSpec().setSubscriptionProperties(subscriptionProperties);
return this;
}
/**
* Adds a subscription property for this subscription. Properties are immutable, and
* consumers under the same subscription will fail to create a subscription if they
* use different properties.
* @param key the key of the property to add
* @param value the value of the property to add
* @return the consumer builder instance
* @see ConsumerBuilder#subscriptionProperties(Map)
*/
default ReactiveMessageConsumerBuilder<T> subscriptionProperty(String key, String value) {
if (getMutableSpec().getSubscriptionProperties() == null) {
getMutableSpec().setSubscriptionProperties(new LinkedHashMap<>());
}
getMutableSpec().getSubscriptionProperties().put(key, value);
return this;
}
/**
* Sets the consumer name.
*
* <p>
* Consumer name is informative and it can be used to identify a particular consumer
* instance from the topic stats.
* @param consumerName the consumer name
* @return the consumer builder instance
* @see ConsumerBuilder#consumerName(String)
*/
default ReactiveMessageConsumerBuilder<T> consumerName(String consumerName) {
getMutableSpec().setConsumerName(consumerName);
return this;
}
/**
* Sets the properties for the consumer.
*
* <p>
* Properties are application defined metadata that can be attached to the consumer.
* When getting the topic stats, this metadata will be associated to the consumer
* stats for easier identification.
* @param properties the properties to set
* @return the consumer builder instance
* @see ConsumerBuilder#properties(Map)
*/
default ReactiveMessageConsumerBuilder<T> properties(Map<String, String> properties) {
getMutableSpec().setProperties(properties);
return this;
}
/**
* Adds a property to the consumer.
*
* <p>
* Properties are application defined metadata that can be attached to the consumer.
* When getting the topic stats, this metadata will be associated to the consumer
* stats for easier identification.
* @param key the key of the property to add
* @param value the value of the property to add
* @return the consumer builder instance
* @see ConsumerBuilder#property(String, String)
*/
default ReactiveMessageConsumerBuilder<T> property(String key, String value) {
if (getMutableSpec().getProperties() == null) {
getMutableSpec().setProperties(new LinkedHashMap<>());
}
getMutableSpec().getProperties().put(key, value);
return this;
}
/**
* Sets the priority level for the consumer.
* <p>
* <b>Shared subscription</b> Sets the priority level for the shared subscription
* consumers to which the broker gives more priority while dispatching messages. Here,
* the broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
*
* <p>
* In Shared subscription mode, the broker will first dispatch messages to max
* priority-level consumers if they have permits, else the broker will consider next
* priority level consumers.
*
* <p>
* If the subscription has consumer-A with priorityLevel 0 and consumer-B with
* priorityLevel 1 then the broker will dispatch messages only to consumer-A until it
* runs out of permits, then the broker will start dispatching messages to consumer-B.
*
* <pre>
* Consumer PriorityLevel Permits
* C1 0 2
* C2 0 1
* C3 0 1
* C4 1 2
* C5 1 1
* Order in which the broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
* </pre>
*
* <p>
* <b>Failover subscription</b> The broker selects the active consumer for a
* failover-subscription based on the consumer priority-level and lexicographical
* sorting of consumer names. eg: <pre>
* 1. Active consumer = C1 : Same priority-level and lexicographical sorting
* Consumer PriorityLevel Name
* C1 0 aaa
* C2 0 bbb
*
* 2. Active consumer = C2 : Consumer with highest priority
* Consumer PriorityLevel Name
* C1 1 aaa
* C2 0 bbb
*
* Partitioned-topics:
* The broker evenly assigns partitioned topics to highest priority consumers.
* </pre>
* @param priorityLevel the priority level of this consumer
* @return the consumer builder instance
* @see ConsumerBuilder#priorityLevel(int)
*/
default ReactiveMessageConsumerBuilder<T> priorityLevel(Integer priorityLevel) {
getMutableSpec().setPriorityLevel(priorityLevel);
return this;
}
/**
* Sets whether the consumer will read messages from the compacted topic rather than
* reading the full message backlog of the topic. This means that, if the topic has
* been compacted, the consumer will only see the latest value for each key in the
* topic, up until the point in the topic message backlog that has been compacted.
* Beyond that point, the messages will be sent as normal.
*
* <p>
* readCompacted can only be enabled for subscriptions to persistent topics, which
* have a single active consumer (i.e. failure or exclusive subscriptions). Attempting
* to enable it on subscriptions to a non-persistent topic or on a shared
* subscription, will lead to the subscription call throwing a PulsarClientException.
* @param readCompacted whether to read from the compacted topic
* @return the consumer builder instance
* @see ConsumerBuilder#readCompacted(boolean)
*/
default ReactiveMessageConsumerBuilder<T> readCompacted(boolean readCompacted) {
getMutableSpec().setReadCompacted(readCompacted);
return this;
}
/**
* Enables or disables the batch index acknowledgment. For the batch index
* acknowledgment feature to work, it must also be enabled on the broker.
* @param batchIndexAckEnabled whether to enable batch index acknowledgment
* @return the consumer builder instance
* @see ConsumerBuilder#enableBatchIndexAcknowledgment(boolean)
*/
default ReactiveMessageConsumerBuilder<T> batchIndexAckEnabled(boolean batchIndexAckEnabled) {
getMutableSpec().setBatchIndexAckEnabled(batchIndexAckEnabled);
return this;
}
/**
* Sets the timeout for unacknowledged messages, truncated to the nearest millisecond.
* The timeout needs to be greater than 1 second.
*
* <p>
* By default, the acknowledgement timeout is disabled and that means that messages
* delivered to a consumer will not be re-delivered unless the consumer crashes.
*
* <p>
* When enabling the acknowledgement timeout, if a message is not acknowledged within
* the specified timeout it will be re-delivered to the consumer (possibly to a
* different consumer in case of a shared subscription).
* @param ackTimeout the timeout for unacknowledged messages.
* @return the consumer builder instance
* @see ConsumerBuilder#ackTimeout(long, TimeUnit)
*/
default ReactiveMessageConsumerBuilder<T> ackTimeout(Duration ackTimeout) {
getMutableSpec().setAckTimeout(ackTimeout);
return this;
}
/**
* Sets the granularity of the ack-timeout redelivery.
*
* <p>
* By default, the tick time is set to 1 second. Using a higher tick time will reduce
* the memory overhead to track messages when the ack-timeout is set to bigger values
* (eg: 1hour).
* @param ackTimeoutTickTime the minimum precision for the acknowledgement timeout
* messages tracker
* @return the consumer builder instance
* @see ConsumerBuilder#ackTimeoutTickTime(long, TimeUnit)
*/
default ReactiveMessageConsumerBuilder<T> ackTimeoutTickTime(Duration ackTimeoutTickTime) {
getMutableSpec().setAckTimeoutTickTime(ackTimeoutTickTime);
return this;
}
/**
* Sets the duration for grouping the acknowledgement messages.
*
* <p>
* By default, the consumer will use a 100 ms grouping time to send the
* acknowledgements to the broker.
*
* <p>
* Setting a group time of 0, will send the acknowledgements immediately. A longer
* acknowledgement group time will be more efficient at the expense of a slight
* increase in message re-deliveries after a failure.
* @param acknowledgementsGroupTime the duration for grouping the acknowledgement
* messages
* @return the consumer builder instance
* @see ConsumerBuilder#acknowledgmentGroupTime(long, TimeUnit)
*/
default ReactiveMessageConsumerBuilder<T> acknowledgementsGroupTime(Duration acknowledgementsGroupTime) {
getMutableSpec().setAcknowledgementsGroupTime(acknowledgementsGroupTime);
return this;
}
/**
* When set to true, ignores the acknowledge operation completion and makes it
* asynchronous from the message consuming processing to improve performance by
* allowing the acknowledges and message processing to interleave. Defaults to true.
* @param acknowledgeAsynchronously when set to true, ignores the acknowledge
* operation completion
* @return the consumer builder instance
*/
default ReactiveMessageConsumerBuilder<T> acknowledgeAsynchronously(boolean acknowledgeAsynchronously) {
getMutableSpec().setAcknowledgeAsynchronously(acknowledgeAsynchronously);
return this;
}
/**
* Sets the scheduler to use to handle acknowledgements.
* @param acknowledgeScheduler the scheduler to use to handle acknowledgements
* @return the consumer builder instance
*/
default ReactiveMessageConsumerBuilder<T> acknowledgeScheduler(Scheduler acknowledgeScheduler) {
getMutableSpec().setAcknowledgeScheduler(acknowledgeScheduler);
return this;
}
/**
* Sets the delay to wait before re-delivering messages that have failed to be
* processed.
*
* <p>
* When the application uses {@link MessageResult#negativeAcknowledge(Message)}, the
* failed message will be redelivered after a fixed timeout. The default is 1 min.
* @param negativeAckRedeliveryDelay the redelivery delay for failed messages
* @return the consumer builder instance
* @see MessageResult#negativeAcknowledge(Message)
* @see ConsumerBuilder#negativeAckRedeliveryDelay(long, TimeUnit)
*/
default ReactiveMessageConsumerBuilder<T> negativeAckRedeliveryDelay(Duration negativeAckRedeliveryDelay) {
getMutableSpec().setNegativeAckRedeliveryDelay(negativeAckRedeliveryDelay);
return this;
}
/**
* Sets a dead letter policy for the consumer.
*
* <p>
* By default, messages are redelivered indefinitely if they are not acknowledged. By
* using a dead letter mechanism, messages that have reached the max redelivery count
* will be acknowledged automatically and send to the configured dead letter topic.
*
* <p>
* You can enable the dead letter mechanism by setting a dead letter policy. Example:
* <pre>
* client.messageConsumer(Schema.BYTES)
* .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
* .build();
* </pre> Default the dead letter topic name is {TopicName}-{Subscription}-DLQ. You
* can set a custom dead letter topic name like this: <pre>
* client.messageConsumer(Schema.BYTES)
* .deadLetterPolicy(DeadLetterPolicy
* .builder()
* .maxRedeliverCount(10)
* .deadLetterTopic("your-topic-name")
* .build())
* .build();
* </pre> When a dead letter policy is specified, and no acknowledgement timeout is
* specified, then the acknowledgement timeout will be set to 30000 millisecond.
* @param deadLetterPolicy the dead letter policy to use
* @return the consumer builder instance
* @see ConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)
*/
default ReactiveMessageConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
getMutableSpec().setDeadLetterPolicy(deadLetterPolicy);
return this;
}
/**
* Sets whether automatic routing to retry letter topic and dead letter topic are
* enabled.
* @param retryLetterTopicEnable whether to automatic retry/dead-letter topics are
* enabled
* @return the consumer builder instance
* @see ConsumerBuilder#enableRetry(boolean)
*/
default ReactiveMessageConsumerBuilder<T> retryLetterTopicEnable(boolean retryLetterTopicEnable) {
getMutableSpec().setRetryLetterTopicEnable(retryLetterTopicEnable);
return this;
}
/**
* Sets the size of the consumer receiver queue.
*
* <p>
* The consumer receiver queue controls how many messages can be accumulated by the
* {@link ReactiveMessageConsumer} before backpressure triggers. Using a higher value
* could potentially increase the consumer throughput at the expense of bigger memory
* utilization.
*
* <p>
* <b>Setting the consumer queue size to zero</b>
* <ul>
* <li>Decreases the throughput of the consumer, by disabling pre-fetching of
* messages. This approach improves the message distribution on shared subscription,
* by pushing messages only to the consumers that are ready to process them.</li>
* <li>Doesn't support batched messages: if the consumer receives any batched message,
* it will close the connection with the broker and
* {@link ReactiveMessageConsumer#consumeOne},
* {@link ReactiveMessageConsumer#consumeMany} will emit an error. <b> the consumer
* will not be able receive any further message unless the batch message is
* removed</b></li>
* </ul>
* The default value is {@code 1000} messages and should be good for most use cases.
* @param receiverQueueSize the receiver queue size value
* @return the consumer builder instance
* @see ConsumerBuilder#receiverQueueSize(int)
*/
default ReactiveMessageConsumerBuilder<T> receiverQueueSize(Integer receiverQueueSize) {
getMutableSpec().setReceiverQueueSize(receiverQueueSize);
return this;
}
/**
* Sets the maximum total receiver queue size across partitions.
*
* <p>
* This setting is used to reduce the receiver queue size for individual partitions
* {@link #receiverQueueSize(Integer)} if the total exceeds this value (default:
* 50000). The purpose of this setting is to have an upper-limit on the number of
* messages that a consumer can be pushed at once from a broker, across all the
* partitions.
* @param maxTotalReceiverQueueSizeAcrossPartitions the maximum pending messages
* across all the partitions
* @return the consumer builder instance
* @see ConsumerBuilder#maxTotalReceiverQueueSizeAcrossPartitions(int)
*/
default ReactiveMessageConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(
Integer maxTotalReceiverQueueSizeAcrossPartitions) {
getMutableSpec().setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
return this;
}
/**
* Sets whether the consumer shall automatically subscribe to new partitions added to
* the topic. This is only for partitioned topics.
* @param autoUpdatePartitions whether to automatically subscribe to new partitions
* @return the consumer builder instance
* @see ConsumerBuilder#autoUpdatePartitions(boolean)
*/
default ReactiveMessageConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdatePartitions) {
getMutableSpec().setAutoUpdatePartitions(autoUpdatePartitions);
return this;
}
/**
* Sets the interval for checking partitions updates <i>(default: 1 minute)</i>. This
* only applies if {@link #autoUpdatePartitions} is enabled.
* @param autoUpdatePartitionsInterval the interval for checking partitions updates
* @return the consumer builder instance
* @see ConsumerBuilder#autoUpdatePartitionsInterval(int, TimeUnit)
*/
default ReactiveMessageConsumerBuilder<T> autoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
getMutableSpec().setAutoUpdatePartitionsInterval(autoUpdatePartitionsInterval);
return this;
}
/**
* Sets the key reader to be used to decrypt the message payloads.
* @param cryptoKeyReader the key reader to be used to decrypt the message payloads.
* @return the consumer builder instance
* @see ConsumerBuilder#cryptoKeyReader(CryptoKeyReader)
*/
default ReactiveMessageConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
getMutableSpec().setCryptoKeyReader(cryptoKeyReader);
return this;
}
/**
* Sets the action the consumer will take in case of decryption failures.
* @param cryptoFailureAction the action the consumer will take in case of decryption
* failures
* @return the consumer builder instance
* @see ConsumerBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)
*/
default ReactiveMessageConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction cryptoFailureAction) {
getMutableSpec().setCryptoFailureAction(cryptoFailureAction);
return this;
}
/**
* Sets the maximum pending chunked messages. Consumer buffers chunk messages into
* memory until it receives all the chunks of the original message. While consuming
* chunk-messages, chunks from same message might not be contiguous in the stream and
* they might be mixed with other messages' chunks. so, consumer has to maintain
* multiple buffers to manage chunks coming from different messages. This mainly
* happens when multiple publishers are publishing messages on the topic concurrently
* or publisher failed to publish all chunks of the messages.
*
* <pre>
* eg: M1-C1, M2-C1, M1-C2, M2-C2
* Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
* </pre> Buffering large number of outstanding uncompleted chunked messages can
* create memory pressure. It can be guarded by providing a
* {@code maxPendingChunkedMessage} threshold. Once the consumer reaches this
* threshold, it drops the outstanding non-chunked messages by silently acknowledging
* or asking the broker to redeliver later by marking it unacknowledged. This behavior
* can be controlled by setting {@link #autoAckOldestChunkedMessageOnQueueFull} The
* default value is 10.
* @param maxPendingChunkedMessage the maximum pending chunked messages.
* @return the consumer builder instance
* @see ConsumerBuilder#maxPendingChunkedMessage(int)
*/
default ReactiveMessageConsumerBuilder<T> maxPendingChunkedMessage(Integer maxPendingChunkedMessage) {
getMutableSpec().setMaxPendingChunkedMessage(maxPendingChunkedMessage);
return this;
}
/**
* Sets whether non-chunked messages are silently acknowledged when
* {@code maxPendingChunkedMessage} is reached. Buffering large number of outstanding
* uncompleted chunked messages can create memory pressure. It can be guarded by
* providing {@link #maxPendingChunkedMessage} threshold. Once the consumer reaches
* this threshold, it drops the outstanding non-chunked messages by silently
* acknowledging if autoAckOldestChunkedMessageOnQueueFull is true or else it marks
* them for redelivery. Defaults to false.
* @param autoAckOldestChunkedMessageOnQueueFull whether non-chunked messages are
* silently acknowledged
* @return the consumer builder instance
* @see ConsumerBuilder#autoAckOldestChunkedMessageOnQueueFull(boolean)
*/
default ReactiveMessageConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(
boolean autoAckOldestChunkedMessageOnQueueFull) {
getMutableSpec().setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
return this;
}
/**
* Sets the duration after which incomplete chunked messages are expired (happens for
* instance if the producer fails to publish all the chunks).
* @param expireTimeOfIncompleteChunkedMessage the duration after which incomplete
* chunked messages are expired
* @return the consumer builder instance
* @see ConsumerBuilder#expireTimeOfIncompleteChunkedMessage(long, TimeUnit)
*/
default ReactiveMessageConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(
Duration expireTimeOfIncompleteChunkedMessage) {
getMutableSpec().setExpireTimeOfIncompleteChunkedMessage(expireTimeOfIncompleteChunkedMessage);
return this;
}
/**
* Builds the reactive message consumer.
* @return the reactive message consumer
*/
ReactiveMessageConsumer<T> build();
}