blob: a6c7ade41fcbba733b2e21a4376a3dcfc2c4f5a7 [file] [log] [blame]
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package com.datatorrent.contrib.kafka;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
* Simple kafka consumer adaptor used by kafka input operator Properties:<br>
* <b>timeout</b>: Timeout for connection and ping <br>
* <b>bufferSize</b>: buffer size of the consumer <br>
* <b>clientId</b>: client id of the consumer <br>
* <b>partitionIds</b>: partition id that the consumer want to consume <br>
* <li>(-1): create #partition threads and consumers to read the topic from different partitions in parallel</li> <br>
* <b>metadataRefreshInterval</b>: The interval that the monitor thread use to monitor the broker leadership change <br>
* <b>metadataRetrievalRetry</b>: Maximum retry times for metadata retrieval failures<br>
* default value is 3 <br>
* -1: unlimited retry <br>
* <br>
* Load balance: <br>
* <li>The consumer create several data-consuming threads to consume the data from broker(s)</li>
* <li>Each thread has only ONE kafka client connecting to ONE broker to consume data from for multiple partitions </li>
* <li>
* There is ONE separate thread to monitor the leadership for all the partitions of the topic at every
* #metadataRefreshInterval milliseconds</li>
* <li>Once leadership
* change detected(leader broker failure, or server-side reassignment), it switches to the new leader broker</li> <li>
* For server-side leadership change, see and</li> <br>
* <br>
* Kafka broker failover: <br>
* <li>Once broker failure is detected, it waits #metadataRefreshInterval to reconnect to the new leader broker</li> <li>
* If there are consecutive #metadataRetrievalRetry failures to retrieve the metadata for the topic. It will stop
* consuming the partition</li> <br>
* @since 0.9.0
public class SimpleKafkaConsumer extends KafkaConsumer
* The data-consuming thread that use one simple kafka client to connect to one broker which is the leader of the partition(s) that this consumer is interested
static final class ConsumerThread implements Runnable
private final Broker broker;
private final String clientName;
// kafka simple consumer object
private SimpleConsumer ksc;
// The SimpleKafkaConsumer which holds this thread
private SimpleKafkaConsumer consumer;
// partitions consumed in this thread
private final Set<KafkaPartition> kpS;
private Future threadItSelf;
private ConsumerThread(Broker broker, Set<KafkaPartition> kpl, SimpleKafkaConsumer consumer)
{ = broker;
this.clientName = consumer.getClientName( + "_" + broker.port());
this.consumer = consumer;
this.kpS = Collections.newSetFromMap(new ConcurrentHashMap<KafkaPartition, Boolean>());
public void run()
try {"Connecting to broker {} [ timeout:{}, buffersize:{}, clientId: {}]", broker, consumer.timeout, consumer.bufferSize, clientName);
ksc = new SimpleConsumer(, broker.port(), consumer.timeout, consumer.bufferSize, clientName);
// Initialize all start offsets for all kafka partitions read from this consumer
// read either from beginning of the broker or last offset committed by the operator
for (KafkaPartition kpForConsumer : kpS) {"Start consuming data of topic {} ", kpForConsumer);
if (consumer.offsetTrack.get(kpForConsumer) != null) {
// start from recovery
// offsets.put(kpForConsumer, offsetTrack.get(kpForConsumer));"Partition {} initial offset {}", kpForConsumer, consumer.offsetTrack.get(kpForConsumer));
} else {
long startOffsetReq = consumer.initialOffset.equalsIgnoreCase("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();"Partition {} initial offset {} {}", kpForConsumer.getPartitionId(), startOffsetReq, consumer.initialOffset);
consumer.offsetTrack.put(kpForConsumer, KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kpForConsumer.getPartitionId(), startOffsetReq, clientName));
// stop consuming only when the consumer container is stopped or the metadata can not be refreshed
while (consumer.isAlive && (consumer.metadataRefreshRetryLimit == -1 || consumer.retryCounter.get() < consumer.metadataRefreshRetryLimit)) {
if (kpS == null || kpS.isEmpty()) {
FetchRequestBuilder frb = new FetchRequestBuilder().clientId(clientName);
// add all partition request in one Fretch request together
for (KafkaPartition kpForConsumer : kpS) {
frb.addFetch(consumer.topic, kpForConsumer.getPartitionId(), consumer.offsetTrack.get(kpForConsumer), consumer.bufferSize);
FetchRequest req =;
if (ksc == null) {
if (consumer.metadataRefreshInterval > 0) {
Thread.sleep(consumer.metadataRefreshInterval + 1000);
} else {
FetchResponse fetchResponse = ksc.fetch(req);
for (Iterator<KafkaPartition> iterator = kpS.iterator(); iterator.hasNext();) {
KafkaPartition kafkaPartition =;
if (fetchResponse.hasError() && fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()) != ErrorMapping.NoError()) {
// Kick off partition(s) which has error when fetch from this broker temporarily
// Monitor will find out which broker it goes in monitor thread
logger.warn("Error when consuming topic {} from broker {} with error code {} ", kafkaPartition, broker, fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()));
consumer.stats.updatePartitionStats(kafkaPartition, -1, "");
// If the fetchResponse either has no error or the no error for $kafkaPartition get the data
long offset = -1l;
for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kafkaPartition.getPartitionId())) {
offset = msg.nextOffset();
consumer.putMessage(kafkaPartition, msg.message(), msg.offset());
if (offset != -1) {
consumer.offsetTrack.put(kafkaPartition, offset);
} catch (Exception e){
logger.error("The consumer encounters an unrecoverable exception. Close the connection to broker {} \n Caused by {}", broker, e);
} finally {
if (ksc != null) {
for (KafkaPartition kpForConsumer : kpS) {
// Update consumer that these partitions are currently stop being consumed because of some unrecoverable exception
}"Exit the consumer thread for broker {} ", broker);
public void addPartitions(Set<KafkaPartition> newKps)
// Add the partition(s) to this existing consumer thread they are assigned to this broker
public Future getThreadItSelf()
return threadItSelf;
public void setThreadItSelf(@SuppressWarnings("rawtypes") Future threadItSelf)
this.threadItSelf = threadItSelf;
public SimpleKafkaConsumer()
public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId)
this(topic, timeout, bufferSize, clientId, null);
public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
this.timeout = timeout;
this.bufferSize = bufferSize;
this.clientId = clientId;
this.kps = partitionIds;
public SimpleKafkaConsumer(SetMultimap<String, String> zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
super(zks, topic);
this.timeout = timeout;
this.bufferSize = bufferSize;
this.clientId = clientId;
this.kps = partitionIds;
private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
* Track consumers connected to each broker, topics and partitions hosted on same broker are consumed by same
* consumer. Clean the resource if necessary. Key is the kafka broker object.
private final transient Map<Broker, ConsumerThread> simpleConsumerThreads = new HashMap<Broker, ConsumerThread>();
private transient ExecutorService kafkaConsumerExecutor;
private transient ScheduledExecutorService metadataRefreshExecutor;
* The metadata refresh retry counter
private final transient AtomicInteger retryCounter = new AtomicInteger(0);
private int timeout = 10000;
* Default buffer size is 1M
private int bufferSize = 1024 * 1024;
* Default client id prefix is "Kafka_Simple_Client"
private String clientId = "Kafka_Simple_Client";
* Interval in between refresh the metadata change(broker change) in milliseconds. Metadata refresh guarantees to
* automatically reconnect to new broker that are new elected as broker host Disable metadata refresh by setting this
* to -1 WARN: Turning off the refresh will disable auto reconnect to new broker
private int metadataRefreshInterval = 30000;
* Maximum brokers' metadata refresh retry limit. -1 means unlimited retry
private int metadataRefreshRetryLimit = -1;
* You can setup your particular kafka partitions you want to consume for this consumer client. This can be used to
* share client and thread and maximize the overall performance. Null or empty value: consumer will create #
* threads&clients same as # brokers that host the all partitions of the topic Each thread consumes 1(+) partitions
* from 1 broker
private Set<KafkaPartition> kps = new HashSet<KafkaPartition>();
// This map maintains mapping between kafka partition and it's leader broker in realtime monitored by a thread
private transient final ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker = new ConcurrentHashMap<KafkaPartition, Broker>();
* Track offset for each partition, so operator could start from the last serialized state Use ConcurrentHashMap to
* avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or
* synchronizedmap)
private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>();
private transient AtomicReference<Throwable> monitorException;
private transient AtomicInteger monitorExceptionCount;
public void create()
Map<String, List<PartitionMetadata>> partitionMetas = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
if (kps == null) {
kps = new HashSet<KafkaPartition>();
if (kps.size() != 0) {
// if partition ids are null or not specified , find all the partition metadata for
// the specific topic from broker
for (Entry<String, List<PartitionMetadata>> en : partitionMetas.entrySet()) {
String clusterId = en.getKey();
for (PartitionMetadata part : en.getValue()) {
KafkaPartition kp = new KafkaPartition(clusterId, topic, part.partitionId());
public void start()
monitorException = new AtomicReference<Throwable>(null);
monitorExceptionCount = new AtomicInteger(0);
// thread to consume the kafka data
kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + topic + "-%d").build());
if(metadataRefreshInterval <= 0) {
// background thread to monitor the kafka metadata change
metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
// start one monitor thread to monitor the leader broker change and trigger some action
metadataRefreshExecutor.scheduleAtFixedRate(new MetaDataMonitorTask(this) , 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
public void close()
{"Stop all consumer threads");
for (ConsumerThread ct : simpleConsumerThreads.values()) {
public void setBufferSize(int bufferSize)
this.bufferSize = bufferSize;
public void setClientId(String clientId)
this.clientId = clientId;
public void setTimeout(int timeout)
this.timeout = timeout;
public int getBufferSize()
return bufferSize;
public String getClientId()
return clientId;
public int getTimeout()
return timeout;
public int getMetadataRefreshInterval()
return metadataRefreshInterval;
public void setMetadataRefreshInterval(int reconnectInterval)
this.metadataRefreshInterval = reconnectInterval;
public int getMetadataRefreshRetryLimit()
return metadataRefreshRetryLimit;
public void setMetadataRefreshRetryLimit(int metadataRefreshRetryLimit)
this.metadataRefreshRetryLimit = metadataRefreshRetryLimit;
protected void commitOffset()
// the simple consumer offset is kept in the offsetTrack
// It's better to do server registry for client in the future. Wait for kafka community come up with more
// sophisticated offset management
private String getClientName(String brokerName)
return clientId + SIMPLE_CONSUMER_ID_SUFFIX + brokerName;
protected Map<KafkaPartition, Long> getCurrentOffsets()
return offsetTrack;
public void resetOffset(Map<KafkaPartition, Long> overrideOffset)
if (overrideOffset == null) {
// set offset of the partitions assigned to this consumer
for (KafkaPartition kp : kps) {
Long offsetForPar = overrideOffset.get(kp);
if (offsetForPar != null) {
offsetTrack.put(kp, offsetForPar);
public KafkaMeterStats getConsumerStats(Map<KafkaPartition, Long> offsetStats)
return super.getConsumerStats();
protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset)
this.kps = partitionIds;
protected Throwable getMonitorException()
return monitorException.get();
protected int getMonitorExceptionCount()
return monitorExceptionCount.get();
* Task to monitor metadata periodically. This task will detect changes in broker for partition
* and restart failed consumer threads for the partitions.
* Monitoring is disabled after metadataRefreshRetryLimit number of failure.
private class MetaDataMonitorTask implements Runnable {
private final SimpleKafkaConsumer ref;
private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
private MetaDataMonitorTask(SimpleKafkaConsumer ref) {
this.ref = ref;
@Override public void run() {
try {
} catch (Throwable ex) {
logger.error("Exception {}", ex);
* Monitor kafka topic metadata changes.
private void monitorMetadata()
if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
if (pms == null) {
// retrieve metadata fail add retry count and return
for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
if (pmLEntry.getValue() == null)
for (PartitionMetadata pm : pmLEntry.getValue()) {
KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
if (!kps.contains(kp)) {
// Out of this consumer's scope
Broker b = pm.leader();
Broker oldB = partitionToBroker.put(kp, b);
if (b.equals(oldB)) {
// add to positive
deltaPositive.put(b, kp);
// always update the latest connection information
stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
// remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
// example)
for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext(); ) {
Entry<Broker, ConsumerThread> item =;
if (item.getValue().getThreadItSelf().isDone()) {
for (Broker b : deltaPositive.keySet()) {
if (!simpleConsumerThreads.containsKey(b)) {
// start thread for new broker
ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
simpleConsumerThreads.put(b, ct);
} else {
// reset to 0 if it reconnect to the broker which has current broker metadata
} // End of SimpleKafkaConsumer