blob: 958c6ce844d34e9d472e3b904d18232f150dac9a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;
import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProducerConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
public static final int DEFAULT_BATCHING_MAX_MESSAGES = 1000;
public static final int DEFAULT_MAX_PENDING_MESSAGES = 0;
public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 0;
private String topicName = null;
private String producerName = null;
private long sendTimeoutMs = 30000;
private boolean blockIfQueueFull = false;
private int maxPendingMessages = DEFAULT_MAX_PENDING_MESSAGES;
private int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
private MessageRoutingMode messageRoutingMode = null;
private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
@JsonIgnore
private MessageRouter customMessageRouter = null;
private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
private int batchingPartitionSwitchFrequencyByPublishDelay = 10;
private int batchingMaxMessages = DEFAULT_BATCHING_MAX_MESSAGES;
private int batchingMaxBytes = 128 * 1024; // 128KB (keep the maximum consistent as previous versions)
private boolean batchingEnabled = true; // enabled by default
@JsonIgnore
private BatcherBuilder batcherBuilder = BatcherBuilder.DEFAULT;
private boolean chunkingEnabled = false;
@JsonIgnore
private CryptoKeyReader cryptoKeyReader;
@JsonIgnore
private transient MessageCrypto messageCrypto = null;
@JsonIgnore
private Set<String> encryptionKeys = new TreeSet<>();
private CompressionType compressionType = CompressionType.NONE;
// Cannot use Optional<Long> since it's not serializable
private Long initialSequenceId = null;
private boolean autoUpdatePartitions = true;
private long autoUpdatePartitionsIntervalSeconds = 60;
private boolean multiSchema = true;
private ProducerAccessMode accessMode = ProducerAccessMode.Shared;
private boolean lazyStartPartitionedProducers = false;
private SortedMap<String, String> properties = new TreeMap<>();
private String initialSubscriptionName = null;
/**
*
* Returns true if encryption keys are added.
*
*/
@JsonIgnore
public boolean isEncryptionEnabled() {
return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty() && (this.cryptoKeyReader != null);
}
public ProducerConfigurationData clone() {
try {
ProducerConfigurationData c = (ProducerConfigurationData) super.clone();
c.encryptionKeys = Sets.newTreeSet(this.encryptionKeys);
c.properties = new TreeMap<>(this.properties);
return c;
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ProducerConfigurationData", e);
}
}
public void setProducerName(String producerName) {
checkArgument(StringUtils.isNotBlank(producerName), "producerName cannot be blank");
this.producerName = producerName;
}
public void setMaxPendingMessages(int maxPendingMessages) {
checkArgument(maxPendingMessages >= 0, "maxPendingMessages needs to be >= 0");
this.maxPendingMessages = maxPendingMessages;
}
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages,
"maxPendingMessagesAcrossPartitions needs to be >= maxPendingMessages");
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
}
public void setBatchingMaxMessages(int batchingMaxMessages) {
this.batchingMaxMessages = batchingMaxMessages;
}
public void setBatchingMaxBytes(int batchingMaxBytes) {
this.batchingMaxBytes = batchingMaxBytes;
}
public void setSendTimeoutMs(int sendTimeout, TimeUnit timeUnit) {
checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0");
this.sendTimeoutMs = timeUnit.toMillis(sendTimeout);
}
public void setBatchingMaxPublishDelayMicros(long batchDelay, TimeUnit timeUnit) {
long delayInMs = timeUnit.toMillis(batchDelay);
checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
this.batchingMaxPublishDelayMicros = timeUnit.toMicros(batchDelay);
}
public void setBatchingPartitionSwitchFrequencyByPublishDelay(int frequencyByPublishDelay) {
checkArgument(frequencyByPublishDelay >= 1, "configured value for partition switch frequency must be >= 1");
this.batchingPartitionSwitchFrequencyByPublishDelay = frequencyByPublishDelay;
}
public long batchingPartitionSwitchFrequencyIntervalMicros() {
return this.batchingPartitionSwitchFrequencyByPublishDelay * batchingMaxPublishDelayMicros;
}
public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) {
checkArgument(interval > 0, "interval needs to be > 0");
this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval);
}
}