Merge pull request #1370 from siyuanh/2.0-multikafka

SPOI-4177 #comment null check before process stats
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 7f8688f..8b1b42a 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -69,6 +69,7 @@
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.slf4j.Logger;
@@ -660,7 +661,7 @@
   {
     StatsListener.Response resp = new StatsListener.Response();
     List<KafkaConsumer.KafkaMeterStats> kstats = extractKafkaStats(stats);
-    resp.repartitionRequired = isPartitionRequired(stats.getOperatorId(), kstats);
+    resp.repartitionRequired = !CollectionUtils.isEmpty(kstats) && isPartitionRequired(stats.getOperatorId(), kstats);
     return resp;
   }
 
@@ -674,6 +675,9 @@
 
   private List<KafkaConsumer.KafkaMeterStats> extractKafkaStats(StatsListener.BatchedOperatorStats stats)
   {
+    if (CollectionUtils.isEmpty(stats.getLastWindowedStats())) {
+      return null;
+    }
     //preprocess the stats
     List<KafkaConsumer.KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
     for (Stats.OperatorStats os : stats.getLastWindowedStats()) {