blob: f0befa0b7540fb457bb0e30daf7e0f00ea026f69 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.v1.ConsumerV1Impl;
/**
* Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
* attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
* will be able to use the same subscription name and the messages will be dispatched in a round robin fashion.
*
* @deprecated Use {@link PulsarClient#newConsumer} to build and configure a {@link Consumer} instance
*/
@Deprecated
public class ConsumerConfiguration implements Serializable {
/**
* Resend shouldn't be requested before minAckTimeoutMillis.
*/
static long minAckTimeoutMillis = 1000;
private static final long serialVersionUID = 1L;
private final ConsumerConfigurationData<byte[]> conf = new ConsumerConfigurationData<>();
private MessageListener<byte[]> messageListener;
public ConsumerConfiguration() {
// Disable acknowledgment grouping when using v1 API
conf.setAcknowledgementsGroupTimeMicros(0);
}
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
public long getAckTimeoutMillis() {
return conf.getAckTimeoutMillis();
}
/**
* Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
* 10 seconds.
*
* @param ackTimeout
* for unacked messages.
* @param timeUnit
* unit in which the timeout is provided.
* @return {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setAckTimeout(long ackTimeout, TimeUnit timeUnit) {
long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
"Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
return this;
}
/**
* @return the configured subscription type
*/
public SubscriptionType getSubscriptionType() {
return conf.getSubscriptionType();
}
/**
* Select the subscription type to be used when subscribing to the topic.
* <p>
* Default is {@link SubscriptionType#Exclusive}
*
* @param subscriptionType
* the subscription type value
*/
public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
Objects.requireNonNull(subscriptionType);
conf.setSubscriptionType(subscriptionType);
return this;
}
/**
* @return the configured {@link MessageListener} for the consumer
*/
public MessageListener<byte[]> getMessageListener() {
return messageListener;
}
/**
* Sets a {@link MessageListener} for the consumer
* <p>
* When a {@link MessageListener} is set, application will receive messages through it. Calls to
* {@link Consumer#receive()} will not be allowed.
*
* @param messageListener
* the listener object
*/
public ConsumerConfiguration setMessageListener(MessageListener<byte[]> messageListener) {
Objects.requireNonNull(messageListener);
this.messageListener = messageListener;
conf.setMessageListener(new org.apache.pulsar.shade.client.api.v2.MessageListener<byte[]>() {
@Override
public void received(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer, Message<byte[]> msg) {
messageListener.received(new ConsumerV1Impl(consumer), msg);
}
@Override
public void reachedEndOfTopic(org.apache.pulsar.shade.client.api.v2.Consumer<byte[]> consumer) {
messageListener.reachedEndOfTopic(new ConsumerV1Impl(consumer));
}
});
return this;
}
/**
* @return this configured {@link ConsumerEventListener} for the consumer.
* @see #setConsumerEventListener(ConsumerEventListener)
* @since 2.0
*/
public ConsumerEventListener getConsumerEventListener() {
return conf.getConsumerEventListener();
}
/**
* Sets a {@link ConsumerEventListener} for the consumer.
*
* <p>
* The consumer group listener is used for receiving consumer state change in a consumer group for failover
* subscription. Application can then react to the consumer state changes.
*
* <p>
* This change is experimental. It is subject to changes coming in release 2.0.
*
* @param listener
* the consumer group listener object
* @return consumer configuration
* @since 2.0
*/
public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
Objects.requireNonNull(listener);
conf.setConsumerEventListener(listener);
return this;
}
/**
* @return the configure receiver queue size value
*/
public int getReceiverQueueSize() {
return conf.getReceiverQueueSize();
}
/**
* @return the configured max total receiver queue size across partitions
*/
public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
}
/**
* Set the max total receiver queue size across partitons.
* <p>
* This setting will be used to reduce the receiver queue size for individual partitions
* {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
*
* @param maxTotalReceiverQueueSizeAcrossPartitions
*/
public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
}
/**
* @return the CryptoKeyReader
*/
public CryptoKeyReader getCryptoKeyReader() {
return conf.getCryptoKeyReader();
}
/**
* Sets a {@link CryptoKeyReader}.
*
* @param cryptoKeyReader
* CryptoKeyReader object
*/
public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
Objects.requireNonNull(cryptoKeyReader);
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
/**
* Sets the ConsumerCryptoFailureAction to the value specified.
*
* @param action
* consumer action
*/
public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
conf.setCryptoFailureAction(action);
}
/**
* @return The ConsumerCryptoFailureAction
*/
public ConsumerCryptoFailureAction getCryptoFailureAction() {
return conf.getCryptoFailureAction();
}
/**
* Sets the size of the consumer receive queue.
* <p>
* The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
* application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer
* throughput at the expense of bigger memory utilization.
* </p>
* <p>
* <b>Setting the consumer queue size as zero</b>
* <ul>
* <li>Decreases the throughput of the consumer, by disabling pre-fetching of messages. This approach improves the
* message distribution on shared subscription, by pushing messages only to the consumers that are ready to process
* them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue
* size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is
* zero.</li>
* <li>Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with
* broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives
* exception in callback. <b> consumer will not be able receive any further message unless batch-message in pipeline
* is removed</b></li>
* </ul>
* </p>
* Default value is {@code 1000} messages and should be good for most use cases.
*
* @param receiverQueueSize
* the new receiver queue size value
*/
public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
/**
* @return the consumer name
*/
public String getConsumerName() {
return conf.getConsumerName();
}
/**
* Set the consumer name.
*
* @param consumerName
*/
public ConsumerConfiguration setConsumerName(String consumerName) {
checkArgument(consumerName != null && !consumerName.equals(""));
conf.setConsumerName(consumerName);
return this;
}
public int getPriorityLevel() {
return conf.getPriorityLevel();
}
/**
* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
* messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) <br>
* In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
* permits, else broker will consider next priority level consumers. <br>
* If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
* messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
*
* <pre>
* Consumer PriorityLevel Permits
* C1 0 2
* C2 0 1
* C3 0 1
* C4 1 2
* C5 1 1
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
* </pre>
*
* @param priorityLevel
*/
public void setPriorityLevel(int priorityLevel) {
conf.setPriorityLevel(priorityLevel);
}
public boolean getReadCompacted() {
return conf.isReadCompacted();
}
/**
* If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
* of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
* each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
* point, the messages will be sent as normal.
*
* readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
* failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
* shared subscription, will lead to the subscription call throwing a PulsarClientException.
*
* @param readCompacted
* whether to read from the compacted topic
*/
public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
conf.setReadCompacted(readCompacted);
return this;
}
/**
* Set a name/value property with this consumer.
*
* @param key
* @param value
* @return
*/
public ConsumerConfiguration setProperty(String key, String value) {
checkArgument(key != null);
checkArgument(value != null);
conf.getProperties().put(key, value);
return this;
}
/**
* Add all the properties in the provided map.
*
* @param properties
* @return
*/
public ConsumerConfiguration setProperties(Map<String, String> properties) {
conf.getProperties().putAll(properties);
return this;
}
public Map<String, String> getProperties() {
return conf.getProperties();
}
public ConsumerConfigurationData<byte[]> getConfigurationData() {
return conf;
}
/**
* @param subscriptionInitialPosition the initial position at which to set
* set cursor when subscribing to the topic first time
* Default is {@value InitialPosition.Latest}
*/
public ConsumerConfiguration setSubscriptionInitialPosition(
SubscriptionInitialPosition subscriptionInitialPosition) {
conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
return this;
}
/**
* @return the configured {@link subscriptionInitialPosition} for the consumer
*/
public SubscriptionInitialPosition getSubscriptionInitialPosition(){
return conf.getSubscriptionInitialPosition();
}
/**
* @return the configured {@link RedeliveryBackoff} for the consumer
*/
public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
return conf.getNegativeAckRedeliveryBackoff();
}
/**
* @param negativeAckRedeliveryBackoff the negative ack redelivery backoff policy.
* Default value is: MultiplierRedeliveryBackoff
* @return the {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setNegativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff) {
conf.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
return this;
}
/**
* @return the configured {@link RedeliveryBackoff} for the consumer
*/
public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
return conf.getAckTimeoutRedeliveryBackoff();
}
/**
* @param ackTimeoutRedeliveryBackoff redelivery backoff policy for ack timeout.
* Default value is: MultiplierRedeliveryBackoff
* @return the {@link ConsumerConfiguration}
*/
public ConsumerConfiguration setAckTimeoutRedeliveryBackoff(RedeliveryBackoff ackTimeoutRedeliveryBackoff) {
conf.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
return this;
}
}