Merge pull request #1370 from siyuanh/2.0-multikafka
SPOI-4177 #comment null check before process stats
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 7f8688f..8b1b42a 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -69,6 +69,7 @@
import kafka.message.Message;
import kafka.message.MessageAndOffset;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.slf4j.Logger;
@@ -660,7 +661,7 @@
{
StatsListener.Response resp = new StatsListener.Response();
List<KafkaConsumer.KafkaMeterStats> kstats = extractKafkaStats(stats);
- resp.repartitionRequired = isPartitionRequired(stats.getOperatorId(), kstats);
+ resp.repartitionRequired = !CollectionUtils.isEmpty(kstats) && isPartitionRequired(stats.getOperatorId(), kstats);
return resp;
}
@@ -674,6 +675,9 @@
private List<KafkaConsumer.KafkaMeterStats> extractKafkaStats(StatsListener.BatchedOperatorStats stats)
{
+ if (CollectionUtils.isEmpty(stats.getLastWindowedStats())) {
+ return null;
+ }
//preprocess the stats
List<KafkaConsumer.KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
for (Stats.OperatorStats os : stats.getLastWindowedStats()) {