[Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)
Fixes #2122
### Motivation
Current backoff strategy is set by default and is too aggressive. What we should do is allow it to be configurable by the user.
### Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (not sure)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 713007b..8907fc3 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -179,8 +179,9 @@
if (msg != null) {
MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
- messageRetries.getNumRetries())) {
- Utils.sleep(100);
+ messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
+ clientConf.getMaxBackoffIntervalNanos())) {
+ Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
} else {
// remove the message from the queue and emit to the topology, only if it should not be backedoff
LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());