blob: 0ef7686ac2659fec15a2f72fb00d3fe1f2f6d6ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.storm;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
/**
* Class used to specify pulsar spout configuration
*
*
*/
public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
/**
*
*/
private static final long serialVersionUID = 1L;
public static final long DEFAULT_FAILED_RETRIES_TIMEOUT_NANO = TimeUnit.SECONDS.toNanos(60);
public static final int DEFAULT_MAX_FAILED_RETRIES = -1;
private String subscriptionName = null;
private MessageToValuesMapper messageToValuesMapper = null;
private long failedRetriesTimeoutNano = DEFAULT_FAILED_RETRIES_TIMEOUT_NANO;
private int maxFailedRetries = DEFAULT_MAX_FAILED_RETRIES;
private boolean sharedConsumerEnabled = false;
private SubscriptionType subscriptionType = SubscriptionType.Shared;
private boolean autoUnsubscribe = false;
private boolean durableSubscription = true;
// read position if non-durable subscription is enabled : default oldest message available in topic
private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;
/**
* @return the subscription name for the consumer in the spout
*/
public String getSubscriptionName() {
return subscriptionName;
}
/**
* Sets the subscription name for the consumer in the spout
*
* @param subscriptionName
*/
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public SubscriptionType getSubscriptionType() {
return subscriptionType;
}
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}
/**
* @return the mapper to convert pulsar message to a storm tuple
*/
public MessageToValuesMapper getMessageToValuesMapper() {
return messageToValuesMapper;
}
/**
* Sets the mapper to convert pulsar message to a storm tuple.
* <p>
* Note: If the mapper returns null, the message is not emitted to the collector and is acked immediately
* </p>
*
* @param mapper
*/
public void setMessageToValuesMapper(MessageToValuesMapper mapper) {
this.messageToValuesMapper = Objects.requireNonNull(mapper);
}
/**
*
* @param unit
* @return the timeout for retrying failed messages
*/
public long getFailedRetriesTimeout(TimeUnit unit) {
return unit.convert(failedRetriesTimeoutNano, TimeUnit.NANOSECONDS);
}
/**
* Sets the timeout within which the spout will re-inject failed messages with an exponential backoff <i>(default:
* 60 seconds)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message will
* be retried forever till it is successfully processed or max message retry count is reached, whichever comes
* first.
*
* @param failedRetriesTimeout
* @param unit
*/
public void setFailedRetriesTimeout(long failedRetriesTimeout, TimeUnit unit) {
this.failedRetriesTimeoutNano = unit.toNanos(failedRetriesTimeout);
}
/**
*
* @return the maximum number of times a failed message will be retried
*/
public int getMaxFailedRetries() {
return maxFailedRetries;
}
/**
* Sets the maximum number of times the spout will re-inject failed messages with an exponential backoff
* <i>(default: -1)</i> Note: If set to 0, the message will not be retried when failed. If set to < 0, the message
* will be retried forever till it is successfully processed or configured timeout expires, whichever comes first.
*
* @param maxFailedRetries
*/
public void setMaxFailedRetries(int maxFailedRetries) {
this.maxFailedRetries = maxFailedRetries;
}
/**
*
* @return if the consumer is shared across different executors of a spout
*/
public boolean isSharedConsumerEnabled() {
return sharedConsumerEnabled;
}
/**
* Sets whether the consumer will be shared across different executors of a spout. <i>(default: false)</i>
*
* @param sharedConsumerEnabled
*/
public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) {
this.sharedConsumerEnabled = sharedConsumerEnabled;
}
public boolean isAutoUnsubscribe() {
return autoUnsubscribe;
}
/**
* It unsubscribes the subscription when spout gets closed in the topology.
*
* @param autoUnsubscribe
*/
public void setAutoUnsubscribe(boolean autoUnsubscribe) {
this.autoUnsubscribe = autoUnsubscribe;
}
public boolean isDurableSubscription() {
return durableSubscription;
}
/**
* if subscription is not durable then it creates non-durable reader to start reading from the
* {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic.
*
* @param nonDurableSubscription
*/
public void setDurableSubscription(boolean durableSubscription) {
this.durableSubscription = durableSubscription;
}
public MessageId getNonDurableSubscriptionReadPosition() {
return nonDurableSubscriptionReadPosition;
}
/**
* Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest.
*
* @param nonDurableSubscriptionReadPosition
*/
public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
}
}