[pulsar-storm] provide auto-unsubscribe option in pulsar-spout (#4238)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 2588741..5156b50 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -103,6 +103,11 @@
public void close() {
try {
LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic());
+
+ if (pulsarSpoutConf.isAutoUnsubscribe()) {
+ consumer.unsubscribe();
+ }
+
if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) {
consumer.close();
}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index 8bd139b..bb8d8c8 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -45,6 +45,7 @@
private boolean sharedConsumerEnabled = false;
private SubscriptionType subscriptionType = SubscriptionType.Shared;
+ private boolean autoUnsubscribe = false;
/**
* @return the subscription name for the consumer in the spout
@@ -146,4 +147,17 @@
public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) {
this.sharedConsumerEnabled = sharedConsumerEnabled;
}
+
+ public boolean isAutoUnsubscribe() {
+ return autoUnsubscribe;
+ }
+
+ /**
+ * It unsubscribes the subscription when spout gets closed in the topology.
+ *
+ * @param autoUnsubscribe
+ */
+ public void setAutoUnsubscribe(boolean autoUnsubscribe) {
+ this.autoUnsubscribe = autoUnsubscribe;
+ }
}