[pulsar-storm] provide auto-unsubscribe option in pulsar-spout (#4238)

diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 2588741..5156b50 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -103,6 +103,11 @@
     public void close() {
         try {
             LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic());
+            
+            if (pulsarSpoutConf.isAutoUnsubscribe()) {
+                consumer.unsubscribe();
+            }
+            
             if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) {
                 consumer.close();
             }
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
index 8bd139b..bb8d8c8 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java
@@ -45,6 +45,7 @@
     private boolean sharedConsumerEnabled = false;
 
     private SubscriptionType subscriptionType = SubscriptionType.Shared;
+    private boolean autoUnsubscribe = false;
 
     /**
      * @return the subscription name for the consumer in the spout
@@ -146,4 +147,17 @@
     public void setSharedConsumerEnabled(boolean sharedConsumerEnabled) {
         this.sharedConsumerEnabled = sharedConsumerEnabled;
     }
+    
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * It unsubscribes the subscription when spout gets closed in the topology.
+     * 
+     * @param autoUnsubscribe
+     */
+    public void setAutoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
 }