blob: ecbdfa76b64aedfe1e1fcea895404a4bdd911571 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptorWrapper;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
@Getter(AccessLevel.PUBLIC)
public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
private final PulsarClientImpl client;
private ProducerConfigurationData conf;
private Schema<T> schema;
private List<ProducerInterceptor> interceptorList;
public ProducerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
this(client, new ProducerConfigurationData(), schema);
}
private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData conf, Schema<T> schema) {
this.client = client;
this.conf = conf;
this.schema = schema;
}
/**
* Allow to override schema in builder implementation.
* @return
*/
public ProducerBuilder<T> schema(Schema<T> schema) {
this.schema = schema;
return this;
}
@Override
public ProducerBuilder<T> clone() {
return new ProducerBuilderImpl<>(client, conf.clone(), schema);
}
@Override
public Producer<T> create() throws PulsarClientException {
try {
return createAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public CompletableFuture<Producer<T>> createAsync() {
// config validation
checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()),
"Batching and chunking of messages can't be enabled together");
if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
}
try {
setMessageRoutingMode();
} catch (PulsarClientException pce) {
return FutureUtil.failedFuture(pce);
}
return interceptorList == null || interceptorList.size() == 0
? client.createProducerAsync(conf, schema, null)
: client.createProducerAsync(conf, schema, new ProducerInterceptors(interceptorList));
}
@Override
public ProducerBuilder<T> loadConf(Map<String, Object> config) {
conf = ConfigurationDataUtils.loadData(
config, conf, ProducerConfigurationData.class);
return this;
}
@Override
public ProducerBuilder<T> topic(String topicName) {
checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank");
conf.setTopicName(StringUtils.trim(topicName));
return this;
}
@Override
public ProducerBuilder<T> producerName(String producerName) {
conf.setProducerName(producerName);
return this;
}
@Override
public ProducerBuilder<T> sendTimeout(int sendTimeout, @NonNull TimeUnit unit) {
conf.setSendTimeoutMs(sendTimeout, unit);
return this;
}
@Override
public ProducerBuilder<T> maxPendingMessages(int maxPendingMessages) {
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
@Deprecated
@Override
public ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
return this;
}
@Override
public ProducerBuilder<T> accessMode(ProducerAccessMode accessMode) {
conf.setAccessMode(accessMode);
return this;
}
@Override
public ProducerBuilder<T> blockIfQueueFull(boolean blockIfQueueFull) {
conf.setBlockIfQueueFull(blockIfQueueFull);
return this;
}
@Override
public ProducerBuilder<T> messageRoutingMode(@NonNull MessageRoutingMode messageRouteMode) {
conf.setMessageRoutingMode(messageRouteMode);
return this;
}
@Override
public ProducerBuilder<T> compressionType(@NonNull CompressionType compressionType) {
conf.setCompressionType(compressionType);
return this;
}
@Override
public ProducerBuilder<T> hashingScheme(@NonNull HashingScheme hashingScheme) {
conf.setHashingScheme(hashingScheme);
return this;
}
@Override
public ProducerBuilder<T> messageRouter(@NonNull MessageRouter messageRouter) {
conf.setCustomMessageRouter(messageRouter);
return this;
}
@Override
public ProducerBuilder<T> enableBatching(boolean batchMessagesEnabled) {
conf.setBatchingEnabled(batchMessagesEnabled);
return this;
}
@Override
public ProducerBuilder<T> enableChunking(boolean chunkingEnabled) {
conf.setChunkingEnabled(chunkingEnabled);
return this;
}
@Override
public ProducerBuilder<T> chunkMaxMessageSize(int chunkMaxMessageSize) {
conf.setChunkMaxMessageSize(chunkMaxMessageSize);
return this;
}
@Override
public ProducerBuilder<T> cryptoKeyReader(@NonNull CryptoKeyReader cryptoKeyReader) {
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
@Override
public ProducerBuilder<T> defaultCryptoKeyReader(String publicKey) {
checkArgument(StringUtils.isNotBlank(publicKey), "publicKey cannot be blank");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPublicKey(publicKey).build());
}
@Override
public ProducerBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> publicKeys) {
checkArgument(!publicKeys.isEmpty(), "publicKeys cannot be empty");
return cryptoKeyReader(DefaultCryptoKeyReader.builder().publicKeys(publicKeys).build());
}
@Override
public ProducerBuilder<T> messageCrypto(MessageCrypto messageCrypto) {
conf.setMessageCrypto(messageCrypto);
return this;
}
@Override
public ProducerBuilder<T> addEncryptionKey(String key) {
checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be blank");
conf.getEncryptionKeys().add(key);
return this;
}
@Override
public ProducerBuilder<T> cryptoFailureAction(@NonNull ProducerCryptoFailureAction action) {
conf.setCryptoFailureAction(action);
return this;
}
@Override
public ProducerBuilder<T> batchingMaxPublishDelay(long batchDelay, @NonNull TimeUnit timeUnit) {
conf.setBatchingMaxPublishDelayMicros(batchDelay, timeUnit);
return this;
}
@Override
public ProducerBuilder<T> roundRobinRouterBatchingPartitionSwitchFrequency(int frequency) {
conf.setBatchingPartitionSwitchFrequencyByPublishDelay(frequency);
return this;
}
@Override
public ProducerBuilder<T> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
return this;
}
@Override
public ProducerBuilder<T> batchingMaxBytes(int batchingMaxBytes) {
conf.setBatchingMaxBytes(batchingMaxBytes);
return this;
}
@Override
public ProducerBuilder<T> batcherBuilder(BatcherBuilder batcherBuilder) {
conf.setBatcherBuilder(batcherBuilder);
return this;
}
@Override
public ProducerBuilder<T> initialSequenceId(long initialSequenceId) {
conf.setInitialSequenceId(initialSequenceId);
return this;
}
@Override
public ProducerBuilder<T> property(String key, String value) {
checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),
"property key/value cannot be blank");
conf.getProperties().put(key, value);
return this;
}
@Override
public ProducerBuilder<T> properties(@NonNull Map<String, String> properties) {
properties.entrySet().forEach(entry ->
checkArgument(StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue()),
"properties' key/value cannot be blank"));
conf.getProperties().putAll(properties);
return this;
}
@Override
public ProducerBuilder<T> intercept(ProducerInterceptor... interceptors) {
if (interceptorList == null) {
interceptorList = new ArrayList<>();
}
interceptorList.addAll(Arrays.asList(interceptors));
return this;
}
@Override
@Deprecated
public ProducerBuilder<T> intercept(org.apache.pulsar.client.api.ProducerInterceptor<T>... interceptors) {
if (interceptorList == null) {
interceptorList = new ArrayList<>();
}
interceptorList.addAll(Arrays.stream(interceptors).map(ProducerInterceptorWrapper::new)
.collect(Collectors.toList()));
return this;
}
@Override
public ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate) {
conf.setAutoUpdatePartitions(autoUpdate);
return this;
}
@Override
public ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
return this;
}
@Override
public ProducerBuilder<T> enableMultiSchema(boolean multiSchema) {
conf.setMultiSchema(multiSchema);
return this;
}
@Override
public ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean lazyStartPartitionedProducers) {
conf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
return this;
}
/**
* Use this config to automatically create an initial subscription when creating the topic.
* If this field is not set, the initial subscription will not be created.
* If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the producer will fail to
* be created.
* This method is limited to internal use. This method will only be used when the consumer creates the dlq producer.
*
* @param initialSubscriptionName Name of the initial subscription of the topic.
* @return the producer builder implementation instance
*/
public ProducerBuilderImpl<T> initialSubscriptionName(String initialSubscriptionName) {
conf.setInitialSubscriptionName(initialSubscriptionName);
return this;
}
private void setMessageRoutingMode() throws PulsarClientException {
if (conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() == null) {
messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
} else if (conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() != null) {
messageRoutingMode(MessageRoutingMode.CustomPartition);
} else if (conf.getMessageRoutingMode() == MessageRoutingMode.CustomPartition
&& conf.getCustomMessageRouter() == null) {
throw new PulsarClientException("When 'messageRoutingMode' is " + MessageRoutingMode.CustomPartition
+ ", 'messageRouter' should be set");
} else if (conf.getMessageRoutingMode() != MessageRoutingMode.CustomPartition
&& conf.getCustomMessageRouter() != null) {
throw new PulsarClientException("When 'messageRouter' is set, 'messageRoutingMode' "
+ "should be set as " + MessageRoutingMode.CustomPartition);
}
}
@Override
public String toString() {
return conf != null ? conf.toString() : "";
}
}