blob: 1d9aa490e716fa135d8b497cba129dae6b0b5dfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.pulsar;
import java.util.concurrent.TimeUnit;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition;
import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.LATEST;
import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
@UriParams
public class PulsarConfiguration implements Cloneable {
@UriParam(label = "common")
private String serviceUrl;
@UriParam(label = "common")
private String authenticationClass;
@UriParam(label = "common")
private String authenticationParams;
@UriParam(label = "consumer")
private boolean topicsPattern;
@UriParam(label = "consumer", defaultValue = "PersistentOnly")
private RegexSubscriptionMode subscriptionTopicsMode;
@UriParam(label = "consumer", defaultValue = "subs")
private String subscriptionName = "subs";
@UriParam(label = "consumer", defaultValue = "EXCLUSIVE")
private SubscriptionType subscriptionType = EXCLUSIVE;
@UriParam(label = "consumer", defaultValue = "1")
private int numberOfConsumers = 1;
@UriParam(label = "consumer", defaultValue = "10")
private int consumerQueueSize = 10;
@UriParam(label = "consumer", defaultValue = "sole-consumer")
private String consumerName = "sole-consumer";
@UriParam(label = "producer")
private String producerName;
@UriParam(label = "consumer", defaultValue = "cons")
private String consumerNamePrefix = "cons";
@UriParam(label = "consumer", defaultValue = "false")
private boolean allowManualAcknowledgement;
@UriParam(label = "consumer", defaultValue = "10000")
private long ackTimeoutMillis = 10000;
@UriParam(label = "consumer", defaultValue = "60000000")
private long negativeAckRedeliveryDelayMicros = 60000000;
@UriParam(label = "consumer", description = "RedeliveryBackoff to use for ack timeout redelivery backoff.")
private RedeliveryBackoff ackTimeoutRedeliveryBackoff;
@UriParam(label = "consumer", description = "RedeliveryBackoff to use for negative ack redelivery backoff.")
private RedeliveryBackoff negativeAckRedeliveryBackoff;
@UriParam(label = "consumer", defaultValue = "100")
private long ackGroupTimeMillis = 100;
@UriParam(label = "consumer", defaultValue = "LATEST")
private SubscriptionInitialPosition subscriptionInitialPosition = LATEST;
@UriParam(label = "consumer", defaultValue = "false")
private boolean readCompacted;
@UriParam(label = "consumer",
description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created")
private Integer maxRedeliverCount;
@UriParam(label = "consumer",
description = "Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, default topic name will be topicName-subscriptionName-DLQ")
private String deadLetterTopic;
@UriParam(label = "consumer", defaultValue = "true",
description = "Whether to use the `messageListener` interface, or to receive messages using a separate thread pool")
private boolean messageListener = true;
@UriParam(label = "consumer", defaultValue = "1",
description = "Number of threads to receive and handle messages when using a separate thread pool")
private int numberOfConsumerThreads = 1;
@UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
private int sendTimeoutMs = 30000;
@UriParam(label = "producer",
description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError",
defaultValue = "false")
private boolean blockIfQueueFull;
@UriParam(label = "producer",
description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true",
defaultValue = "1000")
private int maxPendingMessages = 1000;
@UriParam(label = "producer",
description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
+ "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.",
defaultValue = "50000")
private int maxPendingMessagesAcrossPartitions = 50000;
@UriParam(label = "producer",
description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.",
defaultValue = "1000")
private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
@UriParam(label = "producer", description = "The maximum size to batch messages.", defaultValue = "1000")
private int batchingMaxMessages = 1000;
@UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.",
defaultValue = "true")
private boolean batchingEnabled = true;
@UriParam(label = "producer", description = "Control batching method used by the producer.", defaultValue = "DEFAULT")
private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
@UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId 1.",
defaultValue = "-1")
private long initialSequenceId = -1;
@UriParam(label = "producer", description = "Compression type to use", defaultValue = "NONE")
private CompressionType compressionType = CompressionType.NONE;
@UriParam(label = "producer", description = "Message Routing Mode to use", defaultValue = "RoundRobinPartition")
private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
@UriParam(label = "producer", description = "Custom Message Router to use")
private MessageRouter messageRouter;
/**
* Returns a copy of this configuration
*/
public PulsarConfiguration copy() {
try {
PulsarConfiguration copy = (PulsarConfiguration) clone();
return copy;
} catch (CloneNotSupportedException e) {
throw new RuntimeCamelException(e);
}
}
public boolean isTopicsPattern() {
return topicsPattern;
}
/**
* Whether the topic is a pattern (regular expression) that allows the consumer to subscribe to all matching topics
* in the namespace
*/
public void setTopicsPattern(boolean topicsPattern) {
this.topicsPattern = topicsPattern;
}
public RegexSubscriptionMode getSubscriptionTopicsMode() {
return subscriptionTopicsMode;
}
/**
* Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used
* with pattern subscriptions.
*/
public void setSubscriptionTopicsMode(RegexSubscriptionMode subscriptionTopicsMode) {
this.subscriptionTopicsMode = subscriptionTopicsMode;
}
public String getSubscriptionName() {
return subscriptionName;
}
/**
* Name of the subscription to use
*/
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public SubscriptionType getSubscriptionType() {
return subscriptionType;
}
/**
* Type of the subscription [EXCLUSIVE|SHARED|FAILOVER|KEY_SHARED], defaults to EXCLUSIVE
*/
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}
public int getNumberOfConsumers() {
return numberOfConsumers;
}
/**
* Number of consumers - defaults to 1
*/
public void setNumberOfConsumers(int numberOfConsumers) {
this.numberOfConsumers = numberOfConsumers;
}
public int getConsumerQueueSize() {
return consumerQueueSize;
}
/**
* Size of the consumer queue - defaults to 10
*/
public void setConsumerQueueSize(int consumerQueueSize) {
this.consumerQueueSize = consumerQueueSize;
}
public String getConsumerName() {
return consumerName;
}
/**
* Name of the consumer when subscription is EXCLUSIVE
*/
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public String getProducerName() {
return producerName;
}
/**
* Name of the producer. If unset, lets Pulsar select a unique identifier.
*/
public void setProducerName(String producerName) {
this.producerName = producerName;
}
public String getConsumerNamePrefix() {
return consumerNamePrefix;
}
/**
* Prefix to add to consumer names when a SHARED or FAILOVER subscription is used
*/
public void setConsumerNamePrefix(String consumerNamePrefix) {
this.consumerNamePrefix = consumerNamePrefix;
}
public boolean isAllowManualAcknowledgement() {
return allowManualAcknowledgement;
}
/**
* Whether to allow manual message acknowledgements.
* <p/>
* If this option is enabled, then messages are not acknowledged automatically after successful route completion.
* Instead, an instance of {@link PulsarMessageReceipt} is stored as a header on the
* {@link org.apache.camel.Exchange}. Messages can then be acknowledged using {@link PulsarMessageReceipt} at any
* time before the ackTimeout occurs.
*/
public void setAllowManualAcknowledgement(boolean allowManualAcknowledgement) {
this.allowManualAcknowledgement = allowManualAcknowledgement;
}
public long getAckTimeoutMillis() {
return ackTimeoutMillis;
}
/**
* Timeout for unacknowledged messages in milliseconds - defaults to 10000
*/
public void setAckTimeoutMillis(long ackTimeoutMillis) {
this.ackTimeoutMillis = ackTimeoutMillis;
}
public long getAckGroupTimeMillis() {
return ackGroupTimeMillis;
}
/**
* Group the consumer acknowledgments for the specified time in milliseconds - defaults to 100
*/
public void setAckGroupTimeMillis(long ackGroupTimeMillis) {
this.ackGroupTimeMillis = ackGroupTimeMillis;
}
/**
* Send timeout in milliseconds. Defaults to 30,000ms (30 seconds)
*/
public void setSendTimeoutMs(int sendTimeoutMs) {
this.sendTimeoutMs = sendTimeoutMs;
}
public int getSendTimeoutMs() {
return sendTimeoutMs;
}
/**
* Set whether the send and asyncSend operations should block when the outgoing message queue is full. If set to
* false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left in the
* pending queue. Default is false.
*/
public void setBlockIfQueueFull(boolean blockIfQueueFull) {
this.blockIfQueueFull = blockIfQueueFull;
}
public boolean isBlockIfQueueFull() {
return blockIfQueueFull;
}
/**
* Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default
* is 1000.
*/
public void setMaxPendingMessages(int maxPendingMessages) {
this.maxPendingMessages = maxPendingMessages;
}
public int getMaxPendingMessages() {
return maxPendingMessages;
}
/**
* Set the number of max pending messages across all the partitions. Default is 50000.
*/
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
}
public int getMaxPendingMessagesAcrossPartitions() {
return maxPendingMessagesAcrossPartitions;
}
/**
* Set the time period within which the messages sent will be batched if batch messages are enabled. If set to a non
* zero value, messages will be queued until either:
* <ul>
* <li>this time interval expires</li>
* <li>the max number of messages in a batch is reached
* </ul>
* Default is 1ms.
*/
public void setBatchingMaxPublishDelayMicros(long batchingMaxPublishDelayMicros) {
this.batchingMaxPublishDelayMicros = batchingMaxPublishDelayMicros;
}
public long getBatchingMaxPublishDelayMicros() {
return batchingMaxPublishDelayMicros;
}
/**
* Set the maximum number of messages permitted in a batch. Default 1,000.
*/
public void setBatchingMaxMessages(int batchingMaxMessages) {
this.batchingMaxMessages = batchingMaxMessages;
}
public int getBatchingMaxMessages() {
return batchingMaxMessages;
}
/**
* Control whether automatic batching of messages is enabled for the producer. Default is true.
*/
public void setBatchingEnabled(boolean batchingEnabled) {
this.batchingEnabled = batchingEnabled;
}
public boolean isBatchingEnabled() {
return batchingEnabled;
}
/**
* Control batching method of the Pulsar producer. KEY_BASED batches based on the Pulsar message key. DEFAULT
* batches all messages together regardless of key; this may cause only a single consumer to work when consuming
* using a KEY_SHARED subscription. Default is DEFAULT.
*/
public void setBatcherBuilder(BatcherBuilder batcherBuilder) {
this.batcherBuilder = batcherBuilder;
}
public BatcherBuilder getBatcherBuilder() {
return batcherBuilder;
}
/**
* Control the initial position in the topic of a newly created subscription. Default is latest message.
*/
public void setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
this.subscriptionInitialPosition = subscriptionInitialPosition;
}
public SubscriptionInitialPosition getSubscriptionInitialPosition() {
return subscriptionInitialPosition;
}
/**
* Enable compacted topic reading.
*/
public boolean isReadCompacted() {
return readCompacted;
}
public void setReadCompacted(boolean readCompacted) {
this.readCompacted = readCompacted;
}
/**
* Set the baseline for the sequence ids for messages published by the producer. First message will be using
* (initialSequenceId 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if
* not otherwise specified.
*/
public void setInitialSequenceId(long initialSequenceId) {
this.initialSequenceId = initialSequenceId;
}
public long getInitialSequenceId() {
return initialSequenceId;
}
/**
* Set the compression type for the producer.
*/
public void setCompressionType(String compressionType) {
this.compressionType = CompressionType.valueOf(compressionType.toUpperCase());
}
/**
* Set the compression type for the producer.
*/
public void setCompressionType(CompressionType compressionType) {
this.compressionType = compressionType;
}
public CompressionType getCompressionType() {
return compressionType;
}
/**
* Set the message routing mode for the producer.
*/
public MessageRoutingMode getMessageRoutingMode() {
return messageRoutingMode;
}
public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
this.messageRoutingMode = messageRoutingMode;
}
/**
* Set a custom Message Router.
*/
public MessageRouter getMessageRouter() {
return messageRouter;
}
public void setMessageRouter(MessageRouter messageRouter) {
this.messageRouter = messageRouter;
}
public long getNegativeAckRedeliveryDelayMicros() {
return negativeAckRedeliveryDelayMicros;
}
/**
* Set the negative acknowledgement delay
*/
public void setNegativeAckRedeliveryDelayMicros(long negativeAckRedeliveryDelayMicros) {
this.negativeAckRedeliveryDelayMicros = negativeAckRedeliveryDelayMicros;
}
public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
return ackTimeoutRedeliveryBackoff;
}
/**
* Set a RedeliveryBackoff to use for ack timeout redelivery backoff.
*/
public void setAckTimeoutRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff) {
this.ackTimeoutRedeliveryBackoff = redeliveryBackoff;
}
public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
return negativeAckRedeliveryBackoff;
}
/**
* Set a RedeliveryBackoff to use for negative ack redelivery backoff.
*/
public void setNegativeAckRedeliveryBackoff(RedeliveryBackoff redeliveryBackoff) {
this.negativeAckRedeliveryBackoff = redeliveryBackoff;
}
public Integer getMaxRedeliverCount() {
return maxRedeliverCount;
}
public void setMaxRedeliverCount(Integer maxRedeliverCount) {
this.maxRedeliverCount = maxRedeliverCount;
}
public String getDeadLetterTopic() {
return deadLetterTopic;
}
public void setDeadLetterTopic(String deadLetterTopic) {
this.deadLetterTopic = deadLetterTopic;
}
public boolean isMessageListener() {
return messageListener;
}
public void setMessageListener(boolean messageListener) {
this.messageListener = messageListener;
}
public int getNumberOfConsumerThreads() {
return numberOfConsumerThreads;
}
/**
* Number of consumers threads - defaults to 1
*/
public void setNumberOfConsumerThreads(int numberOfConsumerThreads) {
this.numberOfConsumerThreads = numberOfConsumerThreads;
}
public String getServiceUrl() {
return serviceUrl;
}
/**
* The Pulsar Service URL to point while creating the client from URI
*/
public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
public String getAuthenticationClass() {
return authenticationClass;
}
/**
* The Authentication FQCN to be used while creating the client from URI
*/
public void setAuthenticationClass(String authenticationClass) {
this.authenticationClass = authenticationClass;
}
public String getAuthenticationParams() {
return authenticationParams;
}
/**
* The Authentication Parameters to be used while creating the client from URI
*/
public void setAuthenticationParams(String authenticationParams) {
this.authenticationParams = authenticationParams;
}
}