blob: 4db1d692fb1b37853f565a5907cc66e38b690ccc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.kafka;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotNull;
import org.apache.commons.collections.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
/**
* Simple kafka consumer adaptor used by kafka input operator Properties:<br>
* <b>timeout</b>: Timeout for connection and ping <br>
* <b>bufferSize</b>: buffer size of the consumer <br>
* <b>clientId</b>: client id of the consumer <br>
* <b>partitionIds</b>: partition id that the consumer want to consume <br>
* <li>(-1): create #partition threads and consumers to read the topic from different partitions in parallel</li> <br>
* <b>metadataRefreshInterval</b>: The interval that the monitor thread use to monitor the broker leadership change <br>
* <b>metadataRetrievalRetry</b>: Maximum retry times for metadata retrieval failures<br>
* default value is 3 <br>
* -1: unlimited retry <br>
* <br>
*
* Load balance: <br>
* <li>The consumer create several data-consuming threads to consume the data from broker(s)</li>
* <li>Each thread has only ONE kafka client connecting to ONE broker to consume data from for multiple partitions </li>
* <li>
* There is ONE separate thread to monitor the leadership for all the partitions of the topic at every
* #metadataRefreshInterval milliseconds</li>
* <li>Once leadership
* change detected(leader broker failure, or server-side reassignment), it switches to the new leader broker</li> <li>
* For server-side leadership change, see kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh</li> <br>
* <br>
* Kafka broker failover: <br>
* <li>Once broker failure is detected, it waits #metadataRefreshInterval to reconnect to the new leader broker</li> <li>
* If there are consecutive #metadataRetrievalRetry failures to retrieve the metadata for the topic. It will stop
* consuming the partition</li> <br>
*
* @since 0.9.0
*/
public class SimpleKafkaConsumer extends KafkaConsumer
{
/**
* The data-consuming thread that use one simple kafka client to connect to one broker which is the leader of the partition(s) that this consumer is interested
*/
static final class ConsumerThread implements Runnable
{
private final Broker broker;
private final String clientName;
// kafka simple consumer object
private SimpleConsumer ksc;
// The SimpleKafkaConsumer which holds this thread
private SimpleKafkaConsumer consumer;
// partitions consumed in this thread
private final Set<KafkaPartition> kpS;
@SuppressWarnings("rawtypes")
private Future threadItSelf;
private ConsumerThread(Broker broker, Set<KafkaPartition> kpl, SimpleKafkaConsumer consumer)
{
this.broker = broker;
this.clientName = consumer.getClientName(broker.host() + "_" + broker.port());
this.consumer = consumer;
this.kpS = Collections.newSetFromMap(new ConcurrentHashMap<KafkaPartition, Boolean>());
this.kpS.addAll(kpl);
}
@Override
public void run()
{
try {
logger.info("Connecting to broker {} [ timeout:{}, buffersize:{}, clientId: {}]", broker, consumer.timeout, consumer.bufferSize, clientName);
ksc = new SimpleConsumer(broker.host(), broker.port(), consumer.timeout, consumer.bufferSize, clientName);
// Initialize all start offsets for all kafka partitions read from this consumer
// read either from beginning of the broker or last offset committed by the operator
for (KafkaPartition kpForConsumer : kpS) {
logger.info("Start consuming data of topic {} ", kpForConsumer);
if (consumer.offsetTrack.get(kpForConsumer) != null) {
// start from recovery
// offsets.put(kpForConsumer, offsetTrack.get(kpForConsumer));
logger.info("Partition {} initial offset {}", kpForConsumer, consumer.offsetTrack.get(kpForConsumer));
} else {
long startOffsetReq = consumer.initialOffset.equalsIgnoreCase("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
logger.info("Partition {} initial offset {} {}", kpForConsumer.getPartitionId(), startOffsetReq, consumer.initialOffset);
consumer.offsetTrack.put(kpForConsumer, KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kpForConsumer.getPartitionId(), startOffsetReq, clientName));
}
}
// stop consuming only when the consumer container is stopped or the metadata can not be refreshed
while (consumer.isAlive && (consumer.metadataRefreshRetryLimit == -1 || consumer.retryCounter.get() < consumer.metadataRefreshRetryLimit)) {
if (kpS == null || kpS.isEmpty()) {
return;
}
FetchRequestBuilder frb = new FetchRequestBuilder().clientId(clientName);
// add all partition request in one Fretch request together
for (KafkaPartition kpForConsumer : kpS) {
frb.addFetch(consumer.topic, kpForConsumer.getPartitionId(), consumer.offsetTrack.get(kpForConsumer), consumer.bufferSize);
}
FetchRequest req = frb.build();
if (ksc == null) {
if (consumer.metadataRefreshInterval > 0) {
Thread.sleep(consumer.metadataRefreshInterval + 1000);
} else {
Thread.sleep(100);
}
}
FetchResponse fetchResponse = ksc.fetch(req);
for (Iterator<KafkaPartition> iterator = kpS.iterator(); iterator.hasNext();) {
KafkaPartition kafkaPartition = iterator.next();
short errorCode = fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId());
if (fetchResponse.hasError() && errorCode != ErrorMapping.NoError()) {
// Kick off partition(s) which has error when fetch from this broker temporarily
// Monitor will find out which broker it goes in monitor thread
logger.warn("Error when consuming topic {} from broker {} with error {} ", kafkaPartition, broker,
ErrorMapping.exceptionFor(errorCode));
if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
long seekTo = consumer.initialOffset.toLowerCase().equals("earliest") ? OffsetRequest.EarliestTime()
: OffsetRequest.LatestTime();
seekTo = KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kafkaPartition.getPartitionId(), seekTo, clientName);
logger.warn("Offset out of range error, reset offset to {}", seekTo);
consumer.offsetTrack.put(kafkaPartition, seekTo);
continue;
}
iterator.remove();
consumer.partitionToBroker.remove(kafkaPartition);
consumer.stats.updatePartitionStats(kafkaPartition, -1, "");
continue;
}
// If the fetchResponse either has no error or the no error for $kafkaPartition get the data
long offset = -1l;
for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kafkaPartition.getPartitionId())) {
offset = msg.nextOffset();
consumer.putMessage(kafkaPartition, msg.message(), msg.offset());
}
if (offset != -1) {
consumer.offsetTrack.put(kafkaPartition, offset);
}
}
}
} catch (Exception e){
logger.error("The consumer encounters an unrecoverable exception. Close the connection to broker {} \n Caused by {}", broker, e);
} finally {
if (ksc != null) {
ksc.close();
}
for (KafkaPartition kpForConsumer : kpS) {
// Update consumer that these partitions are currently stop being consumed because of some unrecoverable exception
consumer.partitionToBroker.remove(kpForConsumer);
}
logger.info("Exit the consumer thread for broker {} ", broker);
}
}
public void addPartitions(Set<KafkaPartition> newKps)
{
// Add the partition(s) to this existing consumer thread they are assigned to this broker
kpS.addAll(newKps);
}
@SuppressWarnings("rawtypes")
public Future getThreadItSelf()
{
return threadItSelf;
}
public void setThreadItSelf(@SuppressWarnings("rawtypes") Future threadItSelf)
{
this.threadItSelf = threadItSelf;
}
}
public SimpleKafkaConsumer()
{
super();
}
public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId)
{
this(topic, timeout, bufferSize, clientId, null);
}
public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
{
super(topic);
this.timeout = timeout;
this.bufferSize = bufferSize;
this.clientId = clientId;
this.kps = partitionIds;
}
public SimpleKafkaConsumer(String zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
{
super(zks, topic);
this.timeout = timeout;
this.bufferSize = bufferSize;
this.clientId = clientId;
this.kps = partitionIds;
}
private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
/**
* Track consumers connected to each broker, topics and partitions hosted on same broker are consumed by same
* consumer. Clean the resource if necessary. Key is the kafka broker object.
*/
private final transient Map<Broker, ConsumerThread> simpleConsumerThreads = new HashMap<Broker, ConsumerThread>();
private transient ExecutorService kafkaConsumerExecutor;
private transient ScheduledExecutorService metadataRefreshExecutor;
/**
* The metadata refresh retry counter
*/
private final transient AtomicInteger retryCounter = new AtomicInteger(0);
private int timeout = 10000;
/**
* Default buffer size is 1M
*/
private int bufferSize = 1024 * 1024;
/**
* Default client id prefix is "Kafka_Simple_Client"
*/
@NotNull
private String clientId = "Kafka_Simple_Client";
/**
* Interval in between refresh the metadata change(broker change) in milliseconds. Metadata refresh guarantees to
* automatically reconnect to new broker that are new elected as broker host Disable metadata refresh by setting this
* to -1 WARN: Turning off the refresh will disable auto reconnect to new broker
*/
private int metadataRefreshInterval = 30000;
/**
* Maximum brokers' metadata refresh retry limit. -1 means unlimited retry
*/
private int metadataRefreshRetryLimit = -1;
/**
* You can setup your particular kafka partitions you want to consume for this consumer client. This can be used to
* share client and thread and maximize the overall performance. Null or empty value: consumer will create #
* threads&clients same as # brokers that host the all partitions of the topic Each thread consumes 1(+) partitions
* from 1 broker
*/
private Set<KafkaPartition> kps = new HashSet<KafkaPartition>();
// This map maintains mapping between kafka partition and it's leader broker in realtime monitored by a thread
private transient final ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker = new ConcurrentHashMap<KafkaPartition, Broker>();
/**
* Track offset for each partition, so operator could start from the last serialized state Use ConcurrentHashMap to
* avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or
* synchronizedmap)
*/
private final transient ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>();
private transient AtomicReference<Throwable> monitorException;
private transient AtomicInteger monitorExceptionCount;
@Override
public void start()
{
monitorException = new AtomicReference<Throwable>(null);
monitorExceptionCount = new AtomicInteger(0);
super.start();
// thread to consume the kafka data
kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + topic + "-%d").build());
if(metadataRefreshInterval <= 0 || CollectionUtils.isEmpty(kps)) {
return;
}
// background thread to monitor the kafka metadata change
metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
// start one monitor thread to monitor the leader broker change and trigger some action
metadataRefreshExecutor.scheduleAtFixedRate(new MetaDataMonitorTask(this) , 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
}
@Override
public void close()
{
logger.info("Stop all consumer threads");
for (ConsumerThread ct : simpleConsumerThreads.values()) {
ct.getThreadItSelf().cancel(true);
}
simpleConsumerThreads.clear();
metadataRefreshExecutor.shutdownNow();
kafkaConsumerExecutor.shutdownNow();
}
public void setBufferSize(int bufferSize)
{
this.bufferSize = bufferSize;
}
public void setClientId(String clientId)
{
this.clientId = clientId;
}
public void setTimeout(int timeout)
{
this.timeout = timeout;
}
public int getBufferSize()
{
return bufferSize;
}
public String getClientId()
{
return clientId;
}
public int getTimeout()
{
return timeout;
}
public int getMetadataRefreshInterval()
{
return metadataRefreshInterval;
}
public void setMetadataRefreshInterval(int reconnectInterval)
{
this.metadataRefreshInterval = reconnectInterval;
}
public int getMetadataRefreshRetryLimit()
{
return metadataRefreshRetryLimit;
}
public void setMetadataRefreshRetryLimit(int metadataRefreshRetryLimit)
{
this.metadataRefreshRetryLimit = metadataRefreshRetryLimit;
}
@Override
protected void commitOffset()
{
// the simple consumer offset is kept in the offsetTrack
// It's better to do server registry for client in the future. Wait for kafka community come up with more
// sophisticated offset management
// TODO https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management#
}
private String getClientName(String brokerName)
{
return clientId + SIMPLE_CONSUMER_ID_SUFFIX + brokerName;
}
@Override
protected Map<KafkaPartition, Long> getCurrentOffsets()
{
return offsetTrack;
}
public void resetOffset(Map<KafkaPartition, Long> overrideOffset)
{
if (overrideOffset == null) {
return;
}
offsetTrack.clear();
// set offset of the partitions assigned to this consumer
for (KafkaPartition kp : kps) {
Long offsetForPar = overrideOffset.get(kp);
if (offsetForPar != null) {
offsetTrack.put(kp, offsetForPar);
}
}
}
public KafkaMeterStats getConsumerStats(Map<KafkaPartition, Long> offsetStats)
{
stats.updateOffsets(offsetStats);
return super.getConsumerStats();
}
@Override
protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset)
{
this.kps = partitionIds;
resetOffset(startOffset);
}
protected Throwable getMonitorException()
{
return monitorException.get();
}
protected int getMonitorExceptionCount()
{
return monitorExceptionCount.get();
}
/**
* Task to monitor metadata periodically. This task will detect changes in broker for partition
* and restart failed consumer threads for the partitions.
* Monitoring is disabled after metadataRefreshRetryLimit number of failure.
*/
private class MetaDataMonitorTask implements Runnable {
private final SimpleKafkaConsumer ref;
private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
private MetaDataMonitorTask(SimpleKafkaConsumer ref) {
this.ref = ref;
}
@Override public void run() {
try {
monitorMetadata();
monitorException.set(null);
monitorExceptionCount.set(0);
} catch (Throwable ex) {
logger.error("Exception {}", ex);
monitorException.set(ex);
monitorExceptionCount.incrementAndGet();
}
}
/**
* Monitor kafka topic metadata changes.
*/
private void monitorMetadata()
{
if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
if (pms == null) {
// retrieve metadata fail add retry count and return
retryCounter.getAndAdd(1);
return;
}
for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
if (pmLEntry.getValue() == null)
continue;
for (PartitionMetadata pm : pmLEntry.getValue()) {
KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
if (!kps.contains(kp)) {
// Out of this consumer's scope
continue;
}
Broker b = pm.leader();
if (b == null) {
logger.info("No Leader broker for Kafka Partition {}. Skipping it for time until new leader is elected", kp.getPartitionId());
continue;
}
Broker oldB = partitionToBroker.put(kp, b);
if (b.equals(oldB)) {
continue;
}
// add to positive
deltaPositive.put(b, kp);
// always update the latest connection information
stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
}
}
// remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
// example)
for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext(); ) {
Entry<Broker, ConsumerThread> item = iterator.next();
if (item.getValue().getThreadItSelf().isDone()) {
iterator.remove();
}
}
for (Broker b : deltaPositive.keySet()) {
if (!simpleConsumerThreads.containsKey(b)) {
// start thread for new broker
ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
simpleConsumerThreads.put(b, ct);
} else {
simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
}
}
deltaPositive.clear();
// reset to 0 if it reconnect to the broker which has current broker metadata
retryCounter.set(0);
}
}
}
} // End of SimpleKafkaConsumer