blob: f644c6a18398fb2fb9b3fa69735109594d928283 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicConsumerBuilder;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.FutureUtil;
@Getter(AccessLevel.PUBLIC)
public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
private final PulsarClientImpl client;
private ConsumerConfigurationData<T> conf;
private final Schema<T> schema;
private List<ConsumerInterceptor<T>> interceptorList;
private static final long MIN_ACK_TIMEOUT_MILLIS = 1000;
private static final long MIN_TICK_TIME_MILLIS = 100;
public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
this(client, new ConsumerConfigurationData<T>(), schema);
}
ConsumerBuilderImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, Schema<T> schema) {
checkArgument(schema != null, "Schema should not be null.");
this.client = client;
this.conf = conf;
this.schema = schema;
}
@Override
public ConsumerBuilder<T> loadConf(Map<String, Object> config) {
this.conf = ConfigurationDataUtils.loadData(config, conf, ConsumerConfigurationData.class);
return this;
}
@Override
public ConsumerBuilder<T> clone() {
return new ConsumerBuilderImpl<>(client, conf.clone(), schema);
}
@Override
public Consumer<T> subscribe() throws PulsarClientException {
try {
return subscribeAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public CompletableFuture<Consumer<T>> subscribeAsync() {
if (conf.getTopicNames().isEmpty() && conf.getTopicsPattern() == null) {
return FutureUtil
.failedFuture(new InvalidConfigurationException("Topic name must be set on the consumer builder"));
}
if (StringUtils.isBlank(conf.getSubscriptionName())) {
return FutureUtil.failedFuture(
new InvalidConfigurationException("Subscription name must be set on the consumer builder"));
}
if (conf.getKeySharedPolicy() != null && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
return FutureUtil.failedFuture(
new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
}
CompletableFuture<Void> applyDLQConfig;
if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
//Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
+ RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
+ RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
|| StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
CompletableFuture<PartitionedTopicMetadata> retryLetterTopicMetadata =
client.getPartitionedTopicMetadata(oldRetryLetterTopic);
CompletableFuture<PartitionedTopicMetadata> deadLetterTopicMetadata =
client.getPartitionedTopicMetadata(oldDeadLetterTopic);
applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
.thenAccept(__ -> {
String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+ RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+ RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
if (retryLetterTopicMetadata.join().partitions > 0) {
retryLetterTopic = oldRetryLetterTopic;
}
if (deadLetterTopicMetadata.join().partitions > 0) {
deadLetterTopic = oldDeadLetterTopic;
}
if (deadLetterPolicy == null) {
conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
.retryLetterTopic(retryLetterTopic)
.deadLetterTopic(deadLetterTopic)
.build());
} else {
if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
}
if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
}
}
conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
});
} else {
conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
applyDLQConfig = CompletableFuture.completedFuture(null);
}
} else {
applyDLQConfig = CompletableFuture.completedFuture(null);
}
return applyDLQConfig.thenCompose(__ -> {
if (interceptorList == null || interceptorList.size() == 0) {
return client.subscribeAsync(conf, schema, null);
} else {
return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
}
});
}
@Override
public ConsumerBuilder<T> topic(String... topicNames) {
checkArgument(topicNames != null && topicNames.length > 0,
"Passed in topicNames should not be null or empty.");
return topics(Arrays.stream(topicNames).collect(Collectors.toList()));
}
@Override
public ConsumerBuilder<T> topics(List<String> topicNames) {
checkArgument(topicNames != null && !topicNames.isEmpty(),
"Passed in topicNames list should not be null or empty.");
topicNames.stream().forEach(topicName ->
checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
conf.getTopicNames().addAll(topicNames.stream().map(StringUtils::trim).collect(Collectors.toList()));
return this;
}
@Override
public ConsumerBuilder<T> topicsPattern(Pattern topicsPattern) {
checkArgument(conf.getTopicsPattern() == null && !topicsPattern.pattern().isEmpty(),
"Pattern has already been set or is empty.");
conf.setTopicsPattern(topicsPattern);
return this;
}
@Override
public ConsumerBuilder<T> topicsPattern(String topicsPattern) {
checkArgument(StringUtils.isNotEmpty(topicsPattern), "topicsPattern should not be null or empty");
return topicsPattern(Pattern.compile(topicsPattern));
}
@Override
public ConsumerBuilder<T> subscriptionName(String subscriptionName) {
checkArgument(StringUtils.isNotBlank(subscriptionName), "subscriptionName cannot be blank");
conf.setSubscriptionName(subscriptionName);
return this;
}
@Override
public ConsumerBuilder<T> subscriptionProperties(Map<String, String> subscriptionProperties) {
checkArgument(subscriptionProperties != null, "subscriptionProperties cannot be null");
conf.setSubscriptionProperties(Collections.unmodifiableMap(subscriptionProperties));
return this;
}
@Override
public ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit) {
checkArgument(ackTimeout == 0 || timeUnit.toMillis(ackTimeout) >= MIN_ACK_TIMEOUT_MILLIS,
"Ack timeout should be greater than " + MIN_ACK_TIMEOUT_MILLIS + " ms");
conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
return this;
}
@Override
public ConsumerBuilder<T> isAckReceiptEnabled(boolean isAckReceiptEnabled) {
conf.setAckReceiptEnabled(isAckReceiptEnabled);
return this;
}
@Override
public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit) {
checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
"Ack timeout tick time should be greater than " + MIN_TICK_TIME_MILLIS + " ms");
conf.setTickDurationMillis(timeUnit.toMillis(tickTime));
return this;
}
@Override
public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit) {
checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >= 0");
conf.setNegativeAckRedeliveryDelayMicros(timeUnit.toMicros(redeliveryDelay));
return this;
}
@Override
public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType subscriptionType) {
conf.setSubscriptionType(subscriptionType);
return this;
}
@Override
public ConsumerBuilder<T> subscriptionMode(@NonNull SubscriptionMode subscriptionMode) {
conf.setSubscriptionMode(subscriptionMode);
return this;
}
@Override
public ConsumerBuilder<T> messageListener(@NonNull MessageListener<T> messageListener) {
conf.setMessageListener(messageListener);
return this;
}
@Override
public ConsumerBuilder<T> consumerEventListener(@NonNull ConsumerEventListener consumerEventListener) {
conf.setConsumerEventListener(consumerEventListener);
return this;
}
@Override
public ConsumerBuilder<T> cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyReader) {
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
@Override
public ConsumerBuilder<T> defaultCryptoKeyReader(String privateKey) {
checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot be blank");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build());
}
@Override
public ConsumerBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> privateKeys) {
checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build());
}
@Override
public ConsumerBuilder<T> messageCrypto(@NonNull MessageCrypto messageCrypto) {
conf.setMessageCrypto(messageCrypto);
return this;
}
@Override
public ConsumerBuilder<T> cryptoFailureAction(@NonNull ConsumerCryptoFailureAction action) {
conf.setCryptoFailureAction(action);
return this;
}
@Override
public ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0");
conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
@Override
public ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit) {
checkArgument(delay >= 0, "acknowledgmentGroupTime needs to be >= 0");
conf.setAcknowledgementsGroupTimeMicros(unit.toMicros(delay));
return this;
}
@Override
public ConsumerBuilder<T> maxAcknowledgmentGroupSize(int messageNum) {
checkArgument(messageNum > 0, "acknowledgementsGroupSize needs to be > 0");
conf.setMaxAcknowledgmentGroupSize(messageNum);
return this;
}
@Override
public ConsumerBuilder<T> consumerName(String consumerName) {
checkArgument(StringUtils.isNotBlank(consumerName), "consumerName cannot be blank");
conf.setConsumerName(consumerName);
return this;
}
@Override
public ConsumerBuilder<T> priorityLevel(int priorityLevel) {
checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0");
conf.setPriorityLevel(priorityLevel);
return this;
}
@Override
public ConsumerBuilder<T> maxPendingChuckedMessage(int maxPendingChuckedMessage) {
conf.setMaxPendingChunkedMessage(maxPendingChuckedMessage);
return this;
}
@Override
public ConsumerBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage) {
conf.setMaxPendingChunkedMessage(maxPendingChunkedMessage);
return this;
}
@Override
public ConsumerBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull) {
conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
return this;
}
@Override
public ConsumerBuilder<T> property(String key, String value) {
checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
"property key/value cannot be blank");
conf.getProperties().put(key, value);
return this;
}
@Override
public ConsumerBuilder<T> properties(@NonNull Map<String, String> properties) {
properties.entrySet().forEach(entry ->
checkArgument(
StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue()),
"properties' key/value cannot be blank"));
conf.getProperties().putAll(properties);
return this;
}
@Override
public ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= 0,
"maxTotalReceiverQueueSizeAcrossPartitions needs to be >= 0");
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
return this;
}
@Override
public ConsumerBuilder<T> readCompacted(boolean readCompacted) {
conf.setReadCompacted(readCompacted);
return this;
}
@Override
public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int periodInMinutes) {
checkArgument(periodInMinutes >= 0, "periodInMinutes needs to be >= 0");
patternAutoDiscoveryPeriod(periodInMinutes, TimeUnit.MINUTES);
return this;
}
@Override
public ConsumerBuilder<T> patternAutoDiscoveryPeriod(int interval, TimeUnit unit) {
checkArgument(interval >= 0, "interval needs to be >= 0");
int intervalSeconds = (int) unit.toSeconds(interval);
conf.setPatternAutoDiscoveryPeriod(intervalSeconds);
return this;
}
@Override
public ConsumerBuilder<T> subscriptionInitialPosition(@NonNull SubscriptionInitialPosition
subscriptionInitialPosition) {
conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
return this;
}
@Override
public ConsumerBuilder<T> subscriptionTopicsMode(@NonNull RegexSubscriptionMode mode) {
conf.setRegexSubscriptionMode(mode);
return this;
}
@Override
public ConsumerBuilder<T> replicateSubscriptionState(boolean replicateSubscriptionState) {
conf.setReplicateSubscriptionState(replicateSubscriptionState);
return this;
}
@Override
public ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors) {
if (interceptorList == null) {
interceptorList = new ArrayList<>();
}
interceptorList.addAll(Arrays.asList(interceptors));
return this;
}
@Override
public ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
if (deadLetterPolicy != null) {
checkArgument(deadLetterPolicy.getMaxRedeliverCount() > 0, "MaxRedeliverCount must be > 0.");
}
conf.setDeadLetterPolicy(deadLetterPolicy);
return this;
}
@Override
public ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate) {
conf.setAutoUpdatePartitions(autoUpdate);
return this;
}
@Override
public ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
return this;
}
@Override
public ConsumerBuilder<T> startMessageIdInclusive() {
conf.setResetIncludeHead(true);
return this;
}
public ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePolicy) {
checkArgument(batchReceivePolicy != null, "batchReceivePolicy must not be null.");
batchReceivePolicy.verify();
conf.setBatchReceivePolicy(batchReceivePolicy);
return this;
}
@Override
public String toString() {
return conf != null ? conf.toString() : "";
}
@Override
public ConsumerBuilder<T> keySharedPolicy(KeySharedPolicy keySharedPolicy) {
keySharedPolicy.validate();
conf.setKeySharedPolicy(keySharedPolicy);
return this;
}
@Override
public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
conf.setRetryEnable(retryEnable);
return this;
}
@Override
public ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled) {
conf.setBatchIndexAckEnabled(batchIndexAcknowledgmentEnabled);
return this;
}
@Override
public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}
@Override
public ConsumerBuilder<T> poolMessages(boolean poolMessages) {
conf.setPoolMessages(poolMessages);
return this;
}
@Override
public ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor) {
conf.setPayloadProcessor(payloadProcessor);
return this;
}
@Override
public ConsumerBuilder<T> negativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff) {
checkArgument(negativeAckRedeliveryBackoff != null, "negativeAckRedeliveryBackoff must not be null.");
conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
return this;
}
@Override
public ConsumerBuilder<T> ackTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff) {
checkArgument(ackTimeoutRedeliveryBackoff != null, "ackTimeoutRedeliveryBackoff must not be null.");
conf.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
return this;
}
@Override
public ConsumerBuilder<T> startPaused(boolean paused) {
conf.setStartPaused(paused);
return this;
}
@Override
public ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled) {
conf.setAutoScaledReceiverQueueSizeEnabled(enabled);
return this;
}
@Override
public TopicConsumerBuilder<T> topicConfiguration(String topicName) {
TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicName(topicName, conf);
conf.getTopicConfigurations().add(topicConf);
return new TopicConsumerBuilderImpl<>(this, topicConf);
}
@Override
public ConsumerBuilder<T> topicConfiguration(String topicName,
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
builderConsumer.accept(topicConfiguration(topicName));
return this;
}
@Override
public TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern) {
TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicsPattern(topicsPattern, conf);
conf.getTopicConfigurations().add(topicConf);
return new TopicConsumerBuilderImpl<>(this, topicConf);
}
@Override
public ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern,
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
builderConsumer.accept(topicConfiguration(topicsPattern));
return this;
}
}