Fix NPE in monitor thread when kafka cluster becomes unavailable.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 3ad9f37..a6c7ade 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -30,6 +30,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotNull;
@@ -298,6 +299,9 @@
*/
private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>();
+ private transient AtomicReference<Throwable> monitorException;
+ private transient AtomicInteger monitorExceptionCount;
+
@Override
public void create()
{
@@ -325,6 +329,8 @@
@Override
public void start()
{
+ monitorException = new AtomicReference<Throwable>(null);
+ monitorExceptionCount = new AtomicInteger(0);
super.start();
// thread to consume the kafka data
@@ -338,71 +344,7 @@
metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
// start one monitor thread to monitor the leader broker change and trigger some action
- final SimpleKafkaConsumer ref = this;
- metadataRefreshExecutor.scheduleAtFixedRate(new Runnable() {
-
- private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
-
- @Override
- public void run()
- {
- if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
- logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
- Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
- if (pms == null) {
- // retrieve metadata fail add retry count and return
- retryCounter.getAndAdd(1);
- return;
- }
-
- for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
- for (PartitionMetadata pm : pmLEntry.getValue()) {
- KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
- if (!kps.contains(kp)) {
- // Out of this consumer's scope
- continue;
- }
- Broker b = pm.leader();
- Broker oldB = partitionToBroker.put(kp, b);
- if(b.equals(oldB)) {
- continue;
- }
- // add to positive
- deltaPositive.put(b,kp);
-
- // always update the latest connection information
- stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
- }
- }
-
- // remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
- // example)
- for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext();) {
- Entry<Broker, ConsumerThread> item = iterator.next();
- if (item.getValue().getThreadItSelf().isDone()) {
- iterator.remove();
- }
- }
-
- for (Broker b : deltaPositive.keySet()) {
- if (!simpleConsumerThreads.containsKey(b)) {
- // start thread for new broker
- ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
- ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
- simpleConsumerThreads.put(b, ct);
-
- } else {
- simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
- }
- }
-
- deltaPositive.clear();
-
- // reset to 0 if it reconnect to the broker which has current broker metadata
- retryCounter.set(0);
- }
- }
- }, 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
+ metadataRefreshExecutor.scheduleAtFixedRate(new MetaDataMonitorTask(this) , 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
}
@Override
@@ -514,4 +456,107 @@
this.kps = partitionIds;
resetOffset(startOffset);
}
+
+ protected Throwable getMonitorException()
+ {
+ return monitorException.get();
+ }
+
+ protected int getMonitorExceptionCount()
+ {
+ return monitorExceptionCount.get();
+ }
+
+ /**
+ * Task to monitor metadata periodically. This task will detect changes in broker for partition
+ * and restart failed consumer threads for the partitions.
+ * Monitoring is disabled after metadataRefreshRetryLimit number of failure.
+ */
+ private class MetaDataMonitorTask implements Runnable {
+
+ private final SimpleKafkaConsumer ref;
+
+ private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
+
+ private MetaDataMonitorTask(SimpleKafkaConsumer ref) {
+ this.ref = ref;
+ }
+
+ @Override public void run() {
+ try {
+ monitorMetadata();
+ monitorException.set(null);
+ monitorExceptionCount.set(0);
+ } catch (Throwable ex) {
+ logger.error("Exception {}", ex);
+ monitorException.set(ex);
+ monitorExceptionCount.incrementAndGet();
+ }
+ }
+
+ /**
+ * Monitor kafka topic metadata changes.
+ */
+ private void monitorMetadata()
+ {
+ if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
+ logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
+ Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
+ if (pms == null) {
+ // retrieve metadata fail add retry count and return
+ retryCounter.getAndAdd(1);
+ return;
+ }
+
+ for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
+ if (pmLEntry.getValue() == null)
+ continue;
+ for (PartitionMetadata pm : pmLEntry.getValue()) {
+ KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
+ if (!kps.contains(kp)) {
+ // Out of this consumer's scope
+ continue;
+ }
+ Broker b = pm.leader();
+ Broker oldB = partitionToBroker.put(kp, b);
+ if (b.equals(oldB)) {
+ continue;
+ }
+ // add to positive
+ deltaPositive.put(b, kp);
+
+ // always update the latest connection information
+ stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
+ }
+ }
+
+ // remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
+ // example)
+ for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext(); ) {
+ Entry<Broker, ConsumerThread> item = iterator.next();
+ if (item.getValue().getThreadItSelf().isDone()) {
+ iterator.remove();
+ }
+ }
+
+ for (Broker b : deltaPositive.keySet()) {
+ if (!simpleConsumerThreads.containsKey(b)) {
+ // start thread for new broker
+ ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
+ ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
+ simpleConsumerThreads.put(b, ct);
+
+ } else {
+ simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
+ }
+ }
+
+ deltaPositive.clear();
+
+ // reset to 0 if it reconnect to the broker which has current broker metadata
+ retryCounter.set(0);
+ }
+ }
+
+ }
} // End of SimpleKafkaConsumer