blob: 73d97f1f33607f12a04d182f7af728b9c54b4a3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.ReaderInterceptor;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
@Data
public class ReaderConfigurationData<T> implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
@ApiModelProperty(
name = "topicNames",
required = true,
value = "Topic name"
)
private Set<String> topicNames = new HashSet<>();
@JsonIgnore
private MessageId startMessageId;
@JsonIgnore
private long startMessageFromRollbackDurationInSec;
@ApiModelProperty(
name = "receiverQueueSize",
value = "Size of a consumer's receiver queue.\n"
+ "\n"
+ "For example, the number of messages that can be accumulated by a consumer before an "
+ "application calls `Receive`.\n"
+ "\n"
+ "A value higher than the default value increases consumer throughput, though at the expense of "
+ "more memory utilization."
)
private int receiverQueueSize = 1000;
@ApiModelProperty(
name = "readerListener",
value = "A listener that is called for message received."
)
private ReaderListener<T> readerListener;
@ApiModelProperty(
name = "readerName",
value = "Reader name"
)
private String readerName = null;
@ApiModelProperty(
name = "subscriptionRolePrefix",
value = "Prefix of subscription role."
)
private String subscriptionRolePrefix = null;
@ApiModelProperty(
name = "subscriptionName",
value = "Subscription name"
)
private String subscriptionName = null;
@ApiModelProperty(
name = "cryptoKeyReader",
value = "Interface that abstracts the access to a key store."
)
private CryptoKeyReader cryptoKeyReader = null;
@ApiModelProperty(
name = "cryptoFailureAction",
value = "Consumer should take action when it receives a message that can not be decrypted.\n"
+ "* **FAIL**: this is the default option to fail messages until crypto succeeds.\n"
+ "* **DISCARD**: silently acknowledge and not deliver message to an application.\n"
+ "* **CONSUME**: deliver encrypted messages to applications. It is the application's"
+ " responsibility to decrypt the message.\n"
+ "\n"
+ "The message decompression fails.\n"
+ "\n"
+ "If messages contain batch messages, a client is not be able to retrieve individual messages in"
+ " batch.\n"
+ "\n"
+ "Delivered encrypted message contains {@link EncryptionContext} which contains encryption and "
+ "compression information in it using which application can decrypt consumed message payload."
)
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
@JsonIgnore
private transient MessageCrypto messageCrypto = null;
@ApiModelProperty(
name = "readCompacted",
value = "If enabling `readCompacted`, a consumer reads messages from a compacted topic rather than a full "
+ "message backlog of a topic.\n"
+ "\n"
+ "A consumer only sees the latest value for each key in the compacted topic, up until reaching "
+ "the point in the topic message when compacting backlog. Beyond that point, send messages as "
+ "normal.\n"
+ "\n"
+ "`readCompacted` can only be enabled on subscriptions to persistent topics, which have a single "
+ "active consumer (for example, failure or exclusive subscriptions).\n"
+ "\n"
+ "Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions "
+ "leads to a subscription call throwing a `PulsarClientException`."
)
private boolean readCompacted = false;
@ApiModelProperty(
name = "resetIncludeHead",
value = "If set to true, the first message to be returned is the one specified by `messageId`.\n"
+ "\n"
+ "If set to false, the first message to be returned is the one next to the message specified by "
+ "`messageId`."
)
private boolean resetIncludeHead = false;
private transient List<Range> keyHashRanges;
private boolean poolMessages = false;
private boolean autoUpdatePartitions = true;
private long autoUpdatePartitionsIntervalSeconds = 60;
private transient List<ReaderInterceptor<T>> readerInterceptorList;
// max pending chunked message to avoid sending incomplete message into the queue and memory
private int maxPendingChunkedMessage = 10;
private boolean autoAckOldestChunkedMessageOnQueueFull = false;
private long expireTimeOfIncompleteChunkedMessageMillis = TimeUnit.MINUTES.toMillis(1);
private SubscriptionMode subscriptionMode = SubscriptionMode.NonDurable;
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
@JsonIgnore
public String getTopicName() {
if (topicNames.size() > 1) {
throw new IllegalArgumentException("topicNames needs to be = 1");
}
return topicNames.iterator().next();
}
@JsonIgnore
public void setTopicName(String topicNames) {
//Compatible with a single topic
this.topicNames.clear();
this.topicNames.add(topicNames);
}
@SuppressWarnings("unchecked")
public ReaderConfigurationData<T> clone() {
try {
ReaderConfigurationData<T> clone = (ReaderConfigurationData<T>) super.clone();
clone.setTopicNames(new HashSet<>(clone.getTopicNames()));
return clone;
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ReaderConfigurationData");
}
}
@SuppressFBWarnings({"EI_EXPOSE_REP"})
public MessageCrypto getMessageCrypto() {
return messageCrypto;
}
@SuppressFBWarnings({"EI_EXPOSE_REP2"})
public void setMessageCrypto(MessageCrypto messageCrypto) {
this.messageCrypto = messageCrypto;
}
}