(TWILL-161) Added back off logic to SimplyKafkaConsumer

- Avoid excessive amount of polling and logs in case of failure

This closes #76 on GitHub.

Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 230521c..8cfe889 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -72,7 +72,8 @@
   private static final int SO_TIMEOUT = 5 * 1000;           // 5 seconds.
   private static final int MAX_WAIT = 1000;                 // 1 second.
   private static final long CONSUMER_EXPIRE_MINUTES = 1L;   // close consumer if not used for 1 minute.
-  private static final long CONSUMER_FAILURE_RETRY_INTERVAL = 2000L; // Sleep for 2 seconds if failure in consumer.
+  private static final long INIT_CONSUMER_FAILURE_BACKOFF = 100L; // Initial backoff for 100ms if failure in consumer.
+  private static final long MAX_CONSUMER_FAILURE_BACKOFF = 10000L; // Backoff max for 10 seconds if failure in consumer.
   private static final long EMPTY_FETCH_WAIT = 500L;        // Sleep for 500 ms if no message is fetched.
 
   private final BrokerService brokerService;
@@ -328,16 +329,12 @@
       final AtomicLong offset = new AtomicLong(startOffset);
 
       Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
-
+      ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF,
+                                                          MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS);
       while (running) {
         if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) {
           LOG.debug("No leader for topic partition {}.", topicPart);
-          try {
-            TimeUnit.MILLISECONDS.sleep(CONSUMER_FAILURE_RETRY_INTERVAL);
-          } catch (InterruptedException e) {
-            // OK to ignore this, as interrupt would be caused by thread termination.
-            LOG.trace("Consumer sleep interrupted.", e);
-          }
+          backoff.backoff();
           continue;
         }
 
@@ -375,10 +372,12 @@
 
           // Call the callback
           invokeCallback(messages, offset);
+          backoff.reset();
         } catch (Throwable t) {
           if (running || !(t instanceof ClosedByInterruptException)) {
             // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
             LOG.info("Exception when fetching message on {}.", topicPart, t);
+            backoff.backoff();
           }
           consumers.refresh(consumerEntry.getKey());
           consumerEntry = null;
@@ -477,5 +476,38 @@
         }
       };
     }
+
+    /**
+     * Helper class for performance exponential backoff on message fetching failure.
+     */
+    private final class ExponentialBackoff {
+      private final long initialBackoff;
+      private final long maxBackoff;
+      private final TimeUnit backoffUnit;
+      private int failureCount = 0;
+
+      private ExponentialBackoff(long initialBackoff, long maxBackoff, TimeUnit backoffUnit) {
+        this.initialBackoff = initialBackoff;
+        this.maxBackoff = maxBackoff;
+        this.backoffUnit = backoffUnit;
+      }
+
+      void backoff() {
+        failureCount++;
+        long multiplier = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1));
+        long backoff = Math.min(initialBackoff * multiplier, maxBackoff);
+        backoff = backoff < 0 ? maxBackoff : backoff;
+        try {
+          backoffUnit.sleep(backoff);
+        } catch (InterruptedException e) {
+          // OK to ignore since this method is called from the consumer thread only, which on thread shutdown,
+          // the thread will be interrupted
+        }
+      }
+
+      void reset() {
+        failureCount = 0;
+      }
+    }
   }
 }
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 93119ab..4ac8ae4 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -42,8 +42,11 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -84,6 +87,71 @@
   }
 
   @Test
+  public void testKafkaClientReconnect() throws Exception {
+    String topic = "backoff";
+    Properties kafkServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
+    EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkServerConfig);
+
+    ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
+    zkClient.startAndWait();
+    try {
+      zkClient.create("/", null, CreateMode.PERSISTENT).get();
+
+      ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
+      kafkaClient.startAndWait();
+
+      try {
+        server.startAndWait();
+        try {
+          // Publish a messages
+          createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
+
+          // Creater a consumer
+          final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
+          Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0)
+            .consume(new KafkaConsumer.MessageCallback() {
+              @Override
+              public void onReceived(Iterator<FetchedMessage> messages) {
+                while (messages.hasNext()) {
+                  queue.offer(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+                }
+              }
+
+              @Override
+              public void finished() {
+              }
+            });
+
+          // Wait for the first message
+          Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS));
+
+          // Shutdown the server
+          server.stopAndWait();
+
+          // Start the server again.
+          // Needs to create a new instance with the same config since guava service cannot be restarted
+          server = new EmbeddedKafkaServer(kafkServerConfig);
+          server.startAndWait();
+
+          // Publish another message
+          createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start();
+
+          // Should be able to get the second message
+          Assert.assertEquals("0 Second message", queue.poll(60, TimeUnit.SECONDS));
+
+          cancel.cancel();
+        } finally {
+          kafkaClient.stopAndWait();
+        }
+      } finally {
+        server.stopAndWait();
+      }
+    } finally {
+      zkClient.stopAndWait();
+    }
+  }
+
+  @Test
   public void testKafkaClient() throws Exception {
     String topic = "testClient";