blob: e199233a7891361de075fd92c3618055d370971c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.reactive.client.api;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.reactivestreams.Publisher;
/**
* Builder interface for {@link ReactiveMessageSender}.
*
* @param <T> the message payload type
*/
public interface ReactiveMessageSenderBuilder<T> {
/**
* Sets the cache to use for the sender.
* @param producerCache the cache to set
* @return the sender builder instance
*/
ReactiveMessageSenderBuilder<T> cache(ReactiveMessageSenderCache producerCache);
/**
* Sets the maximum number of in-flight messages for the sender. When this value is
* reached, backpressure will be triggered on the
* {@link ReactiveMessageSender#sendOne(MessageSpec)}/{@link ReactiveMessageSender#sendMany(Publisher)}
* operations. Note that the maxInflight setting applies globally for all the
* operations called on the sender.
* @param maxInflight the maximum number of in-flight messages for the sender.
* @return the sender builder instance
*/
ReactiveMessageSenderBuilder<T> maxInflight(int maxInflight);
/**
* Sets the maximum number of concurrent subscriptions for the sender. Note that the
* maxConcurrentSenderSubscriptions setting applies globally for all the operations
* called on the sender.
* @param maxConcurrentSenderSubscriptions the maximum number of concurrent
* subscriptions for the sender.
* @return the sender builder instance
*/
ReactiveMessageSenderBuilder<T> maxConcurrentSenderSubscriptions(int maxConcurrentSenderSubscriptions);
/**
* Creates and returns a copy of this reactive sender builder.
* @return the cloned reactive reader builder
*/
ReactiveMessageSenderBuilder<T> clone();
/**
* Applies a sender spec to configure the sender.
* @param senderSpec the sender spec to apply
* @return the sender builder instance
*/
default ReactiveMessageSenderBuilder<T> applySpec(ReactiveMessageSenderSpec senderSpec) {
getMutableSpec().applySpec(senderSpec);
return this;
}
/**
* Converts this builder to an immutable reactive sender spec with the same settings.
* @return the reactive sender spec.
*/
default ReactiveMessageSenderSpec toImmutableSpec() {
return new ImmutableReactiveMessageSenderSpec(getMutableSpec());
}
/**
* Converts this builder to a mutable reactive sender spec with the same settings.
* @return the reactive sender spec.
*/
MutableReactiveMessageSenderSpec getMutableSpec();
/**
* Sets the topic this sender will be publishing on.
*
* <p>
* This argument is required when constructing the sender.
* @param topicName the name of the topic
* @return the sender builder instance
* @see ProducerBuilder#topic(String)
*/
default ReactiveMessageSenderBuilder<T> topic(String topicName) {
getMutableSpec().setTopicName(topicName);
return this;
}
/**
* Sets a name for the producers created under this sender.
*
* <p>
* If not assigned, the system will generate a globally unique name to each producer.
*
* <p>
* <b>Warning</b>: When specifying a name, it is up to the user to ensure that, for a
* given topic, the producer name is unique across all Pulsar's clusters. Brokers will
* enforce that only a single producer given name can be publishing on a topic.
* @param producerName the name to use for the producer
* @return the sender builder instance
* @see ProducerBuilder#producerName(String)
*/
default ReactiveMessageSenderBuilder<T> producerName(String producerName) {
getMutableSpec().setProducerName(producerName);
return this;
}
/**
* Sets the send timeout <i>(default: 30 seconds)</i> for this sender.
*
* <p>
* If a message is not acknowledged by the server before the sendTimeout expires, an
* error will be reported.
*
* <p>
* Setting the timeout to zero, for example {@code setTimeout(Duration.ZERO)} will set
* the timeout to infinity, which can be useful when using Pulsar's message
* deduplication feature, since the client library will retry forever to publish a
* message. No errors will be propagated back to the application.
* @param sendTimeout the send timeout to set
* @return the sender builder instance
* @see ProducerBuilder#sendTimeout(int, TimeUnit)
*/
default ReactiveMessageSenderBuilder<T> sendTimeout(Duration sendTimeout) {
getMutableSpec().setSendTimeout(sendTimeout);
return this;
}
/**
* Sets the maximum size of the queues holding the messages pending to receive an
* acknowledgment from the broker. This setting applies to each producer created under
* this sender.
*
* <p>
* The producer queue size also determines the max amount of memory that will be
* required by the client application. Until the producer gets a successful
* acknowledgment back from the broker, it will keep in memory (direct memory pool)
* all the messages in the pending queue.
*
* <p>
* Default is 0, disable the pending messages check.
* @param maxPendingMessages the maximum size of the pending messages queue for the
* sender to set
* @return the sender builder instance
* @see ProducerBuilder#maxPendingMessages(int)
*/
default ReactiveMessageSenderBuilder<T> maxPendingMessages(int maxPendingMessages) {
getMutableSpec().setMaxPendingMessages(maxPendingMessages);
return this;
}
/**
* Sets the maximum number of pending messages across all the partitions. This setting
* applies to each producer created under this sender.
*
* <p>
* This setting will be used to lower the max pending messages for each partition
* ({@link #maxPendingMessages(int)}), if the total exceeds the configured value. The
* purpose of this setting is to have an upper-limit on the number of pending messages
* when publishing on a partitioned topic.
*
* <p>
* Default is 0, disable the pending messages across partitions check.
*
* <p>
* If publishing at high rate over a topic with many partitions (especially when
* publishing messages without a partitioning key), it might be beneficial to increase
* this parameter to allow for more pipelining within the individual partitions
* senders.
* @param maxPendingMessagesAcrossPartitions the maximum number of pending messages
* across all the partitions to set
* @return the sender builder instance
* @see ProducerBuilder#maxPendingMessagesAcrossPartitions(int)
*/
default ReactiveMessageSenderBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
getMutableSpec().setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
return this;
}
/**
* Sets the routing mode for a partitioned producer. This setting applies to each
* producer created under this sender.
*
* <p>
* The default routing mode is to round-robin across the available partitions.
*
* <p>
* This logic is applied when the application is not setting a key on a particular
* message. If the key is set with {@link MessageSpecBuilder#key(String)}, then the
* hash of the key will be used to select a partition for the message.
* @param messageRoutingMode the message routing mode to set
* @return the sender builder instance
* @see ProducerBuilder#messageRoutingMode(MessageRoutingMode)
*/
default ReactiveMessageSenderBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode) {
getMutableSpec().setMessageRoutingMode(messageRoutingMode);
return this;
}
/**
* Sets the hashing scheme used to choose the partition on where to publish a
* particular message.
*
* <p>
* Standard hashing functions available are:
* <ul>
* <li>{@link HashingScheme#JavaStringHash}: Java {@code String.hashCode()} (Default)
* <li>{@link HashingScheme#Murmur3_32Hash}: Use Murmur3 hashing function. <a href=
* "https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
* </ul>
* @param hashingScheme the hashing scheme to set
* @return the sender builder instance
* @see ProducerBuilder#hashingScheme(HashingScheme)
*/
default ReactiveMessageSenderBuilder<T> hashingScheme(HashingScheme hashingScheme) {
getMutableSpec().setHashingScheme(hashingScheme);
return this;
}
/**
* Sets the action the sender will take in case of encryption failures.
* @param cryptoFailureAction the action the sender will take in case of encryption
* failures to set
* @return the sender builder instance
* @see ProducerBuilder#cryptoFailureAction(ProducerCryptoFailureAction)
*/
default ReactiveMessageSenderBuilder<T> cryptoFailureAction(ProducerCryptoFailureAction cryptoFailureAction) {
getMutableSpec().setCryptoFailureAction(cryptoFailureAction);
return this;
}
/**
* Sets a custom message routing policy by passing an implementation of
* {@link MessageRouter}.
* @param messageRouter the message router to set
* @return the sender builder instance
* @see ProducerBuilder#messageRouter(MessageRouter)
*/
default ReactiveMessageSenderBuilder<T> messageRouter(MessageRouter messageRouter) {
getMutableSpec().setMessageRouter(messageRouter);
return this;
}
/**
* Sets the time period within which the messages sent will be batched <i>default: 1
* ms</i> if batched messages are enabled. If set to a non-zero value, messages will
* be queued until either:
* <ul>
* <li>this time interval expires</li>
* <li>the maximum number of messages in a batch is reached
* ({@link #batchingMaxMessages(int)})
* <li>the maximum size of a batch is reached ({@link #batchingMaxBytes(int)})
* </ul>
*
* <p>
* All messages will be published as a single batch message. The consumer will be
* delivered individual messages in the batch in the same order they were enqueued.
* @param batchingMaxPublishDelay the time period within which the messages sent will
* be batched
* @return the sender builder instance
* @see #batchingMaxMessages(int)
* @see #batchingMaxBytes(int)
* @see ProducerBuilder#batchingMaxPublishDelay(long, TimeUnit)
*/
default ReactiveMessageSenderBuilder<T> batchingMaxPublishDelay(Duration batchingMaxPublishDelay) {
getMutableSpec().setBatchingMaxPublishDelay(batchingMaxPublishDelay);
return this;
}
/**
* Sets the partition switch frequency while batching of messages is enabled and using
* round-robin routing mode for non-keyed message <i>default: 10</i>.
*
* <p>
* The time period of partition switch is
* roundRobinRouterBatchingPartitionSwitchFrequency *
* {@link #batchingMaxPublishDelay}. During this period, all messages that arrive will
* be routed to the same partition.
* @param roundRobinRouterBatchingPartitionSwitchFrequency the frequency of partition
* switch
* @return the sender builder instance
* @see #messageRoutingMode(MessageRoutingMode)
* @see #batchingMaxPublishDelay(Duration)
* @see ProducerBuilder#roundRobinRouterBatchingPartitionSwitchFrequency(int)
*/
default ReactiveMessageSenderBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(
int roundRobinRouterBatchingPartitionSwitchFrequency) {
getMutableSpec()
.setRoundRobinRouterBatchingPartitionSwitchFrequency(roundRobinRouterBatchingPartitionSwitchFrequency);
return this;
}
/**
* Sets the maximum number of messages permitted in a batch. <i>default: 1000</i> If
* set to a value greater than 1, messages will be batched until this threshold or the
* maximum byte size of a batch is reached or the batch publish delay has elapsed.
*
* <p>
* All messages in a batch will be published as a single batch message. The consumer
* will be delivered individual messages in the batch in the same order they were
* enqueued.
* @param batchingMaxMessages the maximum number of messages in a batch to set
* @return the sender builder instance
* @see #batchingMaxPublishDelay(Duration)
* @see #batchingMaxBytes(int)
* @see ProducerBuilder#batchingMaxMessages(int)
*/
default ReactiveMessageSenderBuilder<T> batchingMaxMessages(int batchingMaxMessages) {
getMutableSpec().setBatchingMaxMessages(batchingMaxMessages);
return this;
}
/**
* Sets the maximum number of bytes permitted in a batch. <i>default: 128KB</i> If set
* to a value greater than 0, messages will be queued until this threshold is reached
* or other batching conditions are met.
*
* <p>
* All messages in a batch will be published as a single batched message. The consumer
* will be delivered individual messages in the batch in the same order they were
* enqueued.
* @param batchingMaxBytes the maximum number of bytes in a batch to set
* @return the sender builder instance
* @see #batchingMaxPublishDelay(Duration)
* @see #batchingMaxMessages(int)
* @see ProducerBuilder#batchingMaxBytes(int)
*/
default ReactiveMessageSenderBuilder<T> batchingMaxBytes(int batchingMaxBytes) {
getMutableSpec().setBatchingMaxBytes(batchingMaxBytes);
return this;
}
/**
* Sets whether batching of messages is enabled for the sender. <i>default:
* enabled</i>
*
* <p>
* When batching is enabled, multiple calls to
* {@link ReactiveMessageSender#sendOne(MessageSpec)}/{@link ReactiveMessageSender#sendMany(Publisher)}
* may result in a single batch to be sent to the broker, leading to better
* throughput, especially when publishing small messages. If compression is enabled,
* messages will be compressed at the batch level, leading to a much better
* compression ratio for similar headers or contents.
*
* <p>
* When enabled default batch delay is set to 1 ms and default batch size is 1000
* messages
*
* <p>
* Batching is enabled by default since 2.0.0.
* @param batchingEnabled whether batching is enabled
* @return the sender builder instance
* @see #batchingMaxPublishDelay(Duration)
* @see #batchingMaxMessages(int)
* @see ProducerBuilder#enableBatching(boolean)
*/
default ReactiveMessageSenderBuilder<T> batchingEnabled(boolean batchingEnabled) {
getMutableSpec().setBatchingEnabled(batchingEnabled);
return this;
}
/**
* Sets the batcher builder of the sender. The sender will use the batcher builder to
* build a batch message container.This is only used when batching is enabled.
* @param batcherBuilder the batcher builder to set
* @return the sender builder instance
* @see ProducerBuilder#batcherBuilder(BatcherBuilder)
*/
default ReactiveMessageSenderBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder) {
getMutableSpec().setBatcherBuilder(batcherBuilder);
return this;
}
/**
* Sets whether chunking of messages is enabled If enabled, when the message size is
* higher than the maximum allowed publishing payload size on the broker, then the
* sender will split the message into multiple chunks and publish them to the broker
* separately and in order. The consumer will stitch them together to form the
* original published message.This allows clients to publish large messages.
*
* <p>
* Recommendations to use this feature:
*
* <pre>
* 1. Chunking is only supported by non-shared subscriptions and persistent-topic.
* 2. Batching shouldn't be used together with chunking.
* 3. The {@link ReactivePulsarClient} keeps published messages into a buffer until it receives the acknowledgements from the broker.
* So, it's better to reduce the {@link #maxPendingMessages} size to prevent the sender occupying large amount
* of memory from these buffered messages.
* 4. Set the message TTL on the namespace to cleanup incomplete chunked messages.
* (sometimes, due to broker-restart or publish timeout, the sender might fail to publish an entire large message.
* So the consumer will not be able to consume and acknowledge those messages. So, those messages can
* only be discarded by message TTL) Or configure
* {@link ReactiveMessageConsumerBuilder#expireTimeOfIncompleteChunkedMessage}
* 5. Consumer configuration: the consumer should also configure {@link ReactiveMessageConsumerBuilder#receiverQueueSize} and {@link ReactiveMessageConsumerBuilder#maxPendingChunkedMessage}
* </pre>
* @param chunkingEnabled whether to enable chunking
* @return the sender builder instance
* @see ProducerBuilder#enableChunking(boolean)
*/
default ReactiveMessageSenderBuilder<T> chunkingEnabled(boolean chunkingEnabled) {
getMutableSpec().setChunkingEnabled(chunkingEnabled);
return this;
}
/**
* Sets the key reader to be used to encrypt the message payloads.
* @param cryptoKeyReader the key reader to be used to encrypt the message payloads.
* @return the sender builder instance
* @see ProducerBuilder#cryptoKeyReader(CryptoKeyReader)
*/
default ReactiveMessageSenderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
getMutableSpec().setCryptoKeyReader(cryptoKeyReader);
return this;
}
/**
* Sets the public encryption key names, used by the producer to encrypt the data key.
*
* <p>
* At the time of producer creation, the Pulsar client checks if there are keys added
* to encryptionKeys. If keys are found, a callback
* {@link CryptoKeyReader#getPrivateKey(String, Map)} and
* {@link CryptoKeyReader#getPublicKey(String, Map)} is invoked against each key to
* load the values of the key. Applications should implement this callback to return
* the key in pkcs8 format. If compression is enabled, the message is encrypted after
* compression. If batch messaging is enabled, the batched message is encrypted.
* @param encryptionKeys the names of the encryption keys in the key store
* @return the sender builder instance
* @see ProducerBuilder#addEncryptionKey(String)
*/
default ReactiveMessageSenderBuilder<T> encryptionKeys(Set<String> encryptionKeys) {
getMutableSpec().setEncryptionKeys(encryptionKeys);
return this;
}
/**
* Sets the compression type for the sender.
*
* <p>
* By default, message payloads are not compressed. Supported compression types are:
* <ul>
* <li>{@link CompressionType#NONE}: No compression (Default)</li>
* <li>{@link CompressionType#LZ4}: Compress with LZ4 algorithm. Faster but lower
* compression than ZLib</li>
* <li>{@link CompressionType#ZLIB}: Standard ZLib compression</li>
* <li>{@link CompressionType#ZSTD}: Compress with Zstandard codec. Since Pulsar 2.3.
* Zstd cannot be used if consumer applications are not in version &gt;= 2.3 as
* well</li>
* <li>{@link CompressionType#SNAPPY} Compress with Snappy codec. Since Pulsar 2.4.
* Snappy cannot be used if consumer applications are not in version &gt;= 2.4 as
* well</li>
* </ul>
* @param compressionType the compression type to set
* @return the sender builder instance
* @see ProducerBuilder#compressionType(CompressionType)
*/
default ReactiveMessageSenderBuilder<T> compressionType(CompressionType compressionType) {
getMutableSpec().setCompressionType(compressionType);
return this;
}
/**
* Sets the baseline for the sequence ids for messages published by the producer. This
* setting applies to each producer created under this sender.
*
* <p>
* First message will be using {@code (initialSequenceId + 1)} as its sequence id and
* subsequent messages will be assigned incremental sequence ids, if not otherwise
* specified.
* @param initialSequenceId the initial sequence id for the producer to set
* @return the sender builder instance
* @see ProducerBuilder#initialSequenceId(long)
*/
default ReactiveMessageSenderBuilder<T> initialSequenceId(long initialSequenceId) {
getMutableSpec().setInitialSequenceId(initialSequenceId);
return this;
}
/**
* If enabled, the sender will automatically discover new partitions of partitioned
* topics at runtime.
*
* <p>
* Default is true.
* @param autoUpdatePartitions whether to auto discover the partition configuration
* changes
* @return the sender builder instance
* @see ProducerBuilder#autoUpdatePartitions(boolean)
*/
default ReactiveMessageSenderBuilder<T> autoUpdatePartitions(boolean autoUpdatePartitions) {
getMutableSpec().setAutoUpdatePartitions(autoUpdatePartitions);
return this;
}
/**
* Sets the interval of partitions updates <i>(default: 1 minute)</i>. This only works
* if {@link #autoUpdatePartitions} is enabled.
* @param autoUpdatePartitionsInterval the interval of partitions updates
* @return the sender builder instance
* @see ProducerBuilder#autoUpdatePartitionsInterval(int, TimeUnit)
*/
default ReactiveMessageSenderBuilder<T> autoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
getMutableSpec().setAutoUpdatePartitionsInterval(autoUpdatePartitionsInterval);
return this;
}
/**
* Sets whether to enable the multiple schema mode for the producer. If enabled, the
* producer can send a message with a different schema from the one specified when it
* was created, otherwise an invalid message exception would be thrown.
*
* <p>
* Enabled by default.
* @param multiSchema whether to enable or disable multiple schema mode
* @return the sender builder instance
* @see ProducerBuilder#enableMultiSchema(boolean)
*/
default ReactiveMessageSenderBuilder<T> multiSchema(boolean multiSchema) {
getMutableSpec().setMultiSchema(multiSchema);
return this;
}
/**
* Sets the type of access mode that the producer requires on the topic. This setting
* applies to each producer created under this sender.
*
* <p>
* Possible values are:
* <ul>
* <li>{@link ProducerAccessMode#Shared}: By default multiple producers can publish on
* a topic
* <li>{@link ProducerAccessMode#Exclusive}: Require exclusive access for producer.
* Fail immediately if there's already a producer connected.
* <li>{@link ProducerAccessMode#WaitForExclusive}: Producer creation is pending until
* it can acquire exclusive access
* </ul>
* @param accessMode the access mode to set
* @return the producer builder instance
* @see ProducerBuilder#accessMode(ProducerAccessMode)
*/
default ReactiveMessageSenderBuilder<T> accessMode(ProducerAccessMode accessMode) {
getMutableSpec().setAccessMode(accessMode);
return this;
}
/**
* Sets whether to start partitioned producers lazily. This setting applies to each
* producer created under this sender. This config affects Shared mode producers of
* partitioned topics only. It controls whether producers register and connect
* immediately to the owner broker of each partition or start lazily on demand. The
* internal producer of one partition is always started eagerly, chosen by the routing
* policy, but the internal producers of any additional partitions are started on
* demand, upon receiving their first message. Using this mode can reduce the strain
* on brokers for topics with large numbers of partitions and when the SinglePartition
* or some custom partial partition routing policy like
* PartialRoundRobinMessageRouterImpl is used without keyed messages. Because producer
* connection can be on demand, this can produce extra send latency for the first
* messages of a given partition.
* @param lazyStartPartitionedProducers whether to start partition producers lazily
* @return the sender builder instance
* @see ProducerBuilder#enableLazyStartPartitionedProducers(boolean)
*/
default ReactiveMessageSenderBuilder<T> lazyStartPartitionedProducers(boolean lazyStartPartitionedProducers) {
getMutableSpec().setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
return this;
}
/**
* Adds a property to the producer. This setting applies to each producer created
* under this sender.
*
* <p>
* Properties are application defined metadata that can be attached to the sender.
* When getting the topic stats, this metadata will be associated to the sender stats
* for easier identification.
* @param key the key of the property to add
* @param value the value of the property to add
* @return the producer builder instance
* @see ProducerBuilder#property(String, String)
*/
default ReactiveMessageSenderBuilder<T> property(String key, String value) {
if (getMutableSpec().getProperties() == null) {
getMutableSpec().setProperties(new LinkedHashMap<>());
}
getMutableSpec().getProperties().put(key, value);
return this;
}
/**
* Sets the properties to the producer. This setting applies to each producer created
* under this sender.
*
* <p>
* Properties are application defined metadata that can be attached to the sender.
* When getting the topic stats, this metadata will be associated to the sender stats
* for easier identification.
* @param properties the properties to set
* @return the sender builder instance
* @see ProducerBuilder#properties(Map)
*/
default ReactiveMessageSenderBuilder<T> properties(Map<String, String> properties) {
getMutableSpec().setProperties(properties);
return this;
}
/**
* Builds the reactive message sender.
* @return the reactive message sender
*/
ReactiveMessageSender<T> build();
}