(TWILL-151) Improve Logging error when fetching message after Kafka server is stopped

This closes #65 on GitHub.

Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 8cfe889..0299e56 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -28,6 +28,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
+
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -39,6 +40,7 @@
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
+
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.kafka.client.BrokerInfo;
@@ -49,6 +51,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.ConnectException;
 import java.nio.channels.ClosedByInterruptException;
 import java.util.Iterator;
 import java.util.List;
@@ -374,9 +377,15 @@
           invokeCallback(messages, offset);
           backoff.reset();
         } catch (Throwable t) {
-          if (running || !(t instanceof ClosedByInterruptException)) {
-            // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
-            LOG.info("Exception when fetching message on {}.", topicPart, t);
+          // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
+          if (!running) {
+            LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart);
+          } else {
+            if (t instanceof ClosedByInterruptException || t instanceof ConnectException) {
+              LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart);
+            } else {
+              LOG.info("Exception when fetching message on {}.", topicPart, t);
+            }
             backoff.backoff();
           }
           consumers.refresh(consumerEntry.getKey());