(TWILL-151) Improve Logging error when fetching message after Kafka server is stopped
This closes #65 on GitHub.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 8cfe889..0299e56 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -28,6 +28,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
+
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
@@ -39,6 +40,7 @@
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
+
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.BrokerInfo;
@@ -49,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.ConnectException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Iterator;
import java.util.List;
@@ -374,9 +377,15 @@
invokeCallback(messages, offset);
backoff.reset();
} catch (Throwable t) {
- if (running || !(t instanceof ClosedByInterruptException)) {
- // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
- LOG.info("Exception when fetching message on {}.", topicPart, t);
+ // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
+ if (!running) {
+ LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart);
+ } else {
+ if (t instanceof ClosedByInterruptException || t instanceof ConnectException) {
+ LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart);
+ } else {
+ LOG.info("Exception when fetching message on {}.", topicPart, t);
+ }
backoff.backoff();
}
consumers.refresh(consumerEntry.getKey());