(TWILL-161) Added back off logic to SimplyKafkaConsumer
- Avoid excessive amount of polling and logs in case of failure
This closes #76 on GitHub.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 230521c..8cfe889 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -72,7 +72,8 @@
private static final int SO_TIMEOUT = 5 * 1000; // 5 seconds.
private static final int MAX_WAIT = 1000; // 1 second.
private static final long CONSUMER_EXPIRE_MINUTES = 1L; // close consumer if not used for 1 minute.
- private static final long CONSUMER_FAILURE_RETRY_INTERVAL = 2000L; // Sleep for 2 seconds if failure in consumer.
+ private static final long INIT_CONSUMER_FAILURE_BACKOFF = 100L; // Initial backoff for 100ms if failure in consumer.
+ private static final long MAX_CONSUMER_FAILURE_BACKOFF = 10000L; // Backoff max for 10 seconds if failure in consumer.
private static final long EMPTY_FETCH_WAIT = 500L; // Sleep for 500 ms if no message is fetched.
private final BrokerService brokerService;
@@ -328,16 +329,12 @@
final AtomicLong offset = new AtomicLong(startOffset);
Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
-
+ ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF,
+ MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS);
while (running) {
if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) {
LOG.debug("No leader for topic partition {}.", topicPart);
- try {
- TimeUnit.MILLISECONDS.sleep(CONSUMER_FAILURE_RETRY_INTERVAL);
- } catch (InterruptedException e) {
- // OK to ignore this, as interrupt would be caused by thread termination.
- LOG.trace("Consumer sleep interrupted.", e);
- }
+ backoff.backoff();
continue;
}
@@ -375,10 +372,12 @@
// Call the callback
invokeCallback(messages, offset);
+ backoff.reset();
} catch (Throwable t) {
if (running || !(t instanceof ClosedByInterruptException)) {
// Only log if it is still running, otherwise, it just the interrupt caused by the stop.
LOG.info("Exception when fetching message on {}.", topicPart, t);
+ backoff.backoff();
}
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
@@ -477,5 +476,38 @@
}
};
}
+
+ /**
+ * Helper class for performance exponential backoff on message fetching failure.
+ */
+ private final class ExponentialBackoff {
+ private final long initialBackoff;
+ private final long maxBackoff;
+ private final TimeUnit backoffUnit;
+ private int failureCount = 0;
+
+ private ExponentialBackoff(long initialBackoff, long maxBackoff, TimeUnit backoffUnit) {
+ this.initialBackoff = initialBackoff;
+ this.maxBackoff = maxBackoff;
+ this.backoffUnit = backoffUnit;
+ }
+
+ void backoff() {
+ failureCount++;
+ long multiplier = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1));
+ long backoff = Math.min(initialBackoff * multiplier, maxBackoff);
+ backoff = backoff < 0 ? maxBackoff : backoff;
+ try {
+ backoffUnit.sleep(backoff);
+ } catch (InterruptedException e) {
+ // OK to ignore since this method is called from the consumer thread only, which on thread shutdown,
+ // the thread will be interrupted
+ }
+ }
+
+ void reset() {
+ failureCount = 0;
+ }
+ }
}
}
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 93119ab..4ac8ae4 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -42,8 +42,11 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
@@ -84,6 +87,71 @@
}
@Test
+ public void testKafkaClientReconnect() throws Exception {
+ String topic = "backoff";
+ Properties kafkServerConfig = generateKafkaConfig(zkServer.getConnectionStr() + "/backoff");
+ EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkServerConfig);
+
+ ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build();
+ zkClient.startAndWait();
+ try {
+ zkClient.create("/", null, CreateMode.PERSISTENT).get();
+
+ ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient);
+ kafkaClient.startAndWait();
+
+ try {
+ server.startAndWait();
+ try {
+ // Publish a messages
+ createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start();
+
+ // Creater a consumer
+ final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
+ Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0)
+ .consume(new KafkaConsumer.MessageCallback() {
+ @Override
+ public void onReceived(Iterator<FetchedMessage> messages) {
+ while (messages.hasNext()) {
+ queue.offer(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+ }
+ }
+
+ @Override
+ public void finished() {
+ }
+ });
+
+ // Wait for the first message
+ Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS));
+
+ // Shutdown the server
+ server.stopAndWait();
+
+ // Start the server again.
+ // Needs to create a new instance with the same config since guava service cannot be restarted
+ server = new EmbeddedKafkaServer(kafkServerConfig);
+ server.startAndWait();
+
+ // Publish another message
+ createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start();
+
+ // Should be able to get the second message
+ Assert.assertEquals("0 Second message", queue.poll(60, TimeUnit.SECONDS));
+
+ cancel.cancel();
+ } finally {
+ kafkaClient.stopAndWait();
+ }
+ } finally {
+ server.stopAndWait();
+ }
+ } finally {
+ zkClient.stopAndWait();
+ }
+ }
+
+ @Test
public void testKafkaClient() throws Exception {
String topic = "testClient";