Fix NPE in monitor thread when kafka cluster becomes unavailable.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 3ad9f37..a6c7ade 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.validation.constraints.NotNull;
 
@@ -298,6 +299,9 @@
    */
   private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>();
 
+  private transient AtomicReference<Throwable> monitorException;
+  private transient AtomicInteger monitorExceptionCount;
+
   @Override
   public void create()
   {
@@ -325,6 +329,8 @@
   @Override
   public void start()
   {
+    monitorException = new AtomicReference<Throwable>(null);
+    monitorExceptionCount = new AtomicInteger(0);
     super.start();
 
     // thread to consume the kafka data
@@ -338,71 +344,7 @@
     metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
 
     // start one monitor thread to monitor the leader broker change and trigger some action
-    final SimpleKafkaConsumer ref = this;
-    metadataRefreshExecutor.scheduleAtFixedRate(new Runnable() {
-
-      private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
-
-      @Override
-      public void run()
-      {
-        if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
-          logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
-          Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
-          if (pms == null) {
-            // retrieve metadata fail add retry count and return
-            retryCounter.getAndAdd(1);
-            return;
-          }
-
-          for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
-            for (PartitionMetadata pm : pmLEntry.getValue()) {
-              KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
-              if (!kps.contains(kp)) {
-                // Out of this consumer's scope
-                continue;
-              }
-              Broker b = pm.leader();
-              Broker oldB = partitionToBroker.put(kp, b);
-              if(b.equals(oldB)) {
-                continue;
-              }
-              // add to positive
-              deltaPositive.put(b,kp);
-
-              // always update the latest connection information
-              stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
-            }
-          }
-
-          // remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
-          // example)
-          for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext();) {
-            Entry<Broker, ConsumerThread> item = iterator.next();
-            if (item.getValue().getThreadItSelf().isDone()) {
-              iterator.remove();
-            }
-          }
-
-          for (Broker b : deltaPositive.keySet()) {
-            if (!simpleConsumerThreads.containsKey(b)) {
-              // start thread for new broker
-              ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
-              ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
-              simpleConsumerThreads.put(b, ct);
-
-            } else {
-              simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
-            }
-          }
-
-          deltaPositive.clear();
-
-          // reset to 0 if it reconnect to the broker which has current broker metadata
-          retryCounter.set(0);
-        }
-      }
-    }, 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
+    metadataRefreshExecutor.scheduleAtFixedRate(new MetaDataMonitorTask(this) , 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -514,4 +456,107 @@
     this.kps = partitionIds;
     resetOffset(startOffset);
   }
+
+  protected Throwable getMonitorException()
+  {
+    return monitorException.get();
+  }
+
+  protected int getMonitorExceptionCount()
+  {
+    return monitorExceptionCount.get();
+  }
+
+  /**
+   * Task to monitor metadata periodically. This task will detect changes in broker for partition
+   * and restart failed consumer threads for the partitions.
+   * Monitoring is disabled after metadataRefreshRetryLimit number of failure.
+   */
+  private class MetaDataMonitorTask implements Runnable {
+
+    private final SimpleKafkaConsumer ref;
+
+    private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
+
+    private MetaDataMonitorTask(SimpleKafkaConsumer ref) {
+      this.ref = ref;
+    }
+
+    @Override public void run() {
+      try {
+        monitorMetadata();
+        monitorException.set(null);
+        monitorExceptionCount.set(0);
+      } catch (Throwable ex) {
+        logger.error("Exception {}", ex);
+        monitorException.set(ex);
+        monitorExceptionCount.incrementAndGet();
+      }
+    }
+
+    /**
+     * Monitor kafka topic metadata changes.
+     */
+    private void monitorMetadata()
+    {
+      if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
+        logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
+        Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
+        if (pms == null) {
+          // retrieve metadata fail add retry count and return
+          retryCounter.getAndAdd(1);
+          return;
+        }
+
+        for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
+          if (pmLEntry.getValue() == null)
+            continue;
+          for (PartitionMetadata pm : pmLEntry.getValue()) {
+            KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
+            if (!kps.contains(kp)) {
+              // Out of this consumer's scope
+              continue;
+            }
+            Broker b = pm.leader();
+            Broker oldB = partitionToBroker.put(kp, b);
+            if (b.equals(oldB)) {
+              continue;
+            }
+            // add to positive
+            deltaPositive.put(b, kp);
+
+            // always update the latest connection information
+            stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
+          }
+        }
+
+        // remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
+        // example)
+        for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext(); ) {
+          Entry<Broker, ConsumerThread> item = iterator.next();
+          if (item.getValue().getThreadItSelf().isDone()) {
+            iterator.remove();
+          }
+        }
+
+        for (Broker b : deltaPositive.keySet()) {
+          if (!simpleConsumerThreads.containsKey(b)) {
+            // start thread for new broker
+            ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
+            ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
+            simpleConsumerThreads.put(b, ct);
+
+          } else {
+            simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
+          }
+        }
+
+        deltaPositive.clear();
+
+        // reset to 0 if it reconnect to the broker which has current broker metadata
+        retryCounter.set(0);
+      }
+    }
+
+  }
 } // End of SimpleKafkaConsumer