blob: a67ff4812d1264da9f3fc6f2f304ea9231a37fc6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.kafka;
import com.datatorrent.api.Context;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import kafka.message.Message;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import javax.validation.constraints.Pattern.Flag;
import java.io.Closeable;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Base Kafka Consumer class used by kafka input operator
*
* @since 0.9.0
*/
public abstract class KafkaConsumer implements Closeable
{
protected final static String HIGHLEVEL_CONSUMER_ID_SUFFIX = "_stream_";
protected final static String SIMPLE_CONSUMER_ID_SUFFIX = "_partition_";
private String zookeeper;
public KafkaConsumer()
{
}
public KafkaConsumer(String topic)
{
this();
this.topic = topic;
}
public KafkaConsumer(String zks, String topic)
{
this.topic = topic;
setZookeeper(zks);
}
private int cacheSize = 1024;
protected transient boolean isAlive = false;
private transient ArrayBlockingQueue<KafkaMessage> holdingBuffer;
/**
* The topic that this consumer consumes
*/
@NotNull
protected String topic = "default_topic";
/**
* A zookeeper map keyed by cluster id.
* It's mandatory field <br>
* Each cluster should have only one connection string contain all nodes in the cluster <br>
* zookeeper chroot path is also supported <br>
*
* Single cluster zookeeper example: <br>
* &nbsp;&nbsp; node1:2181,node2:2181,node3:2181/your/kafka/data <br>
* Multi-cluster zookeeper example: <br>
* &nbsp;&nbsp; cluster1::node1:2181,node2:2181,node3:2181/cluster1;cluster2::node1:2181/cluster2
*
*/
@NotNull
@Bind(JavaSerializer.class)
SetMultimap<String, String> zookeeperMap;
protected transient SetMultimap<String, String> brokers;
/**
* The initialOffset could be either earliest or latest
* Earliest means the beginning the queue
* Latest means the current sync point to consume the queue
* This setting is case_insensitive
* By default it always consume from the beginning of the queue
*/
@Pattern(flags={Flag.CASE_INSENSITIVE}, regexp = "earliest|latest")
protected String initialOffset = "latest";
protected transient SnapShot statsSnapShot = new SnapShot();
protected transient KafkaMeterStats stats = new KafkaMeterStats();
/**
* This method is called in setup method of the operator
*/
public void create(){
initBrokers();
holdingBuffer = new ArrayBlockingQueue<KafkaMessage>(cacheSize);
}
public void initBrokers()
{
if(brokers!=null){
return ;
}
if(zookeeperMap !=null){
brokers = HashMultimap.create();
for (String clusterId: zookeeperMap.keySet()) {
try {
brokers.putAll(clusterId, KafkaMetadataUtil.getBrokers(zookeeperMap.get(clusterId)));
} catch (Exception e) {
// let the user know where we tried to connect to
throw new RuntimeException("Error resolving brokers for cluster " + clusterId + " " + zookeeperMap.get(clusterId), e);
}
}
}
}
/**
* This method is called in the activate method of the operator
*/
public void start()
{
isAlive = true;
statsSnapShot.start();
}
/**
* The method is called in the deactivate method of the operator
*/
public void stop() {
isAlive = false;
statsSnapShot.stop();
holdingBuffer.clear();
IOUtils.closeQuietly(this);
};
/**
* This method is called in teardown method of the operator
*/
public void teardown()
{
holdingBuffer.clear();
}
public boolean isAlive()
{
return isAlive;
}
public void setAlive(boolean isAlive)
{
this.isAlive = isAlive;
}
/**
* Set the Topic.
*/
public void setTopic(String topic)
{
this.topic = topic;
}
public String getTopic()
{
return topic;
}
public KafkaMessage pollMessage()
{
return holdingBuffer.poll();
}
public int messageSize()
{
return holdingBuffer.size();
}
public void setInitialOffset(String initialOffset)
{
this.initialOffset = initialOffset;
}
public String getInitialOffset()
{
return initialOffset;
}
public int getCacheSize()
{
return cacheSize;
}
public void setCacheSize(int cacheSize)
{
this.cacheSize = cacheSize;
}
final protected void putMessage(KafkaPartition partition, Message msg, long offset) throws InterruptedException{
// block from receiving more message
holdingBuffer.put(new KafkaMessage(partition, msg, offset));
statsSnapShot.mark(partition, msg.payloadSize());
}
protected abstract void commitOffset();
protected abstract Map<KafkaPartition, Long> getCurrentOffsets();
public KafkaMeterStats getConsumerStats()
{
statsSnapShot.setupStats(stats);
return stats;
}
protected abstract void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset);
/**
* Set the ZooKeeper quorum of the Kafka cluster(s) you want to consume data from.
* The operator will discover the brokers that it needs to consume messages from.
*/
public void setZookeeper(String zookeeper)
{
this.zookeeper = zookeeper;
this.zookeeperMap = parseZookeeperStr(zookeeper);
}
public String getZookeeper()
{
return zookeeper;
}
/**
* Counter class which gives the statistic value from the consumer
*/
public static class KafkaMeterStats implements Serializable
{
private static final long serialVersionUID = -2867402654990209006L;
/**
* A compact partition counter. The data collected for each partition is 4bytes brokerId + 1byte connected + 8bytes msg/s + 8bytes bytes/s + 8bytes offset
*/
public ConcurrentHashMap<KafkaPartition, PartitionStats> partitionStats = new ConcurrentHashMap<KafkaPartition, PartitionStats>();
/**
* A compact overall counter. The data collected is 4bytes connection number + 8bytes aggregate msg/s + 8bytes aggregate bytes/s
*/
public long totalMsgPerSec;
public long totalBytesPerSec;
public KafkaMeterStats()
{
}
public void set_1minMovingAvgPerPartition(KafkaPartition kp, long[] _1minAvgPar)
{
PartitionStats ps = putPartitionStatsIfNotPresent(kp);
ps.msgsPerSec = _1minAvgPar[0];
ps.bytesPerSec = _1minAvgPar[1];
}
public void set_1minMovingAvg(long[] _1minAvg)
{
totalMsgPerSec = _1minAvg[0];
totalBytesPerSec = _1minAvg[1];
}
public void updateOffsets(Map<KafkaPartition, Long> offsets){
for (Entry<KafkaPartition, Long> os : offsets.entrySet()) {
PartitionStats ps = putPartitionStatsIfNotPresent(os.getKey());
ps.offset = os.getValue();
}
}
public int getConnections()
{
int r = 0;
for (PartitionStats ps : partitionStats.values()) {
if (!StringUtils.isEmpty(ps.brokerHost)) {
r++;
}
}
return r;
}
public void updatePartitionStats(KafkaPartition kp,int brokerId, String host)
{
PartitionStats ps = putPartitionStatsIfNotPresent(kp);
ps.brokerHost = host;
ps.brokerId = brokerId;
}
private synchronized PartitionStats putPartitionStatsIfNotPresent(KafkaPartition kp){
PartitionStats ps = partitionStats.get(kp);
if (ps == null) {
ps = new PartitionStats();
partitionStats.put(kp, ps);
}
return ps;
}
}
public static class KafkaMessage
{
KafkaPartition kafkaPart;
Message msg;
long offSet;
public KafkaMessage(KafkaPartition kafkaPart, Message msg, long offset)
{
this.kafkaPart = kafkaPart;
this.msg = msg;
this.offSet = offset;
}
public KafkaPartition getKafkaPart()
{
return kafkaPart;
}
public Message getMsg()
{
return msg;
}
public long getOffSet()
{
return offSet;
}
}
public static class KafkaMeterStatsUtil {
public static Map<KafkaPartition, Long> getOffsetsForPartitions(List<KafkaMeterStats> kafkaMeterStats)
{
Map<KafkaPartition, Long> result = Maps.newHashMap();
for (KafkaMeterStats kms : kafkaMeterStats) {
for (Entry<KafkaPartition, PartitionStats> item : kms.partitionStats.entrySet()) {
result.put(item.getKey(), item.getValue().offset);
}
}
return result;
}
public static Map<KafkaPartition, long[]> get_1minMovingAvgParMap(KafkaMeterStats kafkaMeterStats)
{
Map<KafkaPartition, long[]> result = Maps.newHashMap();
for (Entry<KafkaPartition, PartitionStats> item : kafkaMeterStats.partitionStats.entrySet()) {
result.put(item.getKey(), new long[]{item.getValue().msgsPerSec, item.getValue().bytesPerSec});
}
return result;
}
}
public static class KafkaMeterStatsAggregator implements Context.CountersAggregator, Serializable{
/**
*
*/
private static final long serialVersionUID = 729987800215151678L;
@Override
public Object aggregate(Collection<?> countersList)
{
KafkaMeterStats kms = new KafkaMeterStats();
for (Object o : countersList) {
if (o instanceof KafkaMeterStats){
KafkaMeterStats subKMS = (KafkaMeterStats)o;
kms.partitionStats.putAll(subKMS.partitionStats);
kms.totalBytesPerSec += subKMS.totalBytesPerSec;
kms.totalMsgPerSec += subKMS.totalMsgPerSec;
}
}
return kms;
}
}
public static class PartitionStats implements Serializable {
/**
*
*/
private static final long serialVersionUID = -6572690643487689766L;
public int brokerId = -1;
public long msgsPerSec;
public long bytesPerSec;
public long offset;
public String brokerHost = "";
}
/**
* A snapshot of consuming rate within 1 min
*/
static class SnapShot {
// msgs/s and bytes/s for each partition
/**
* 1 min total msg number for each partition
*/
private final Map<KafkaPartition, long[]> _1_min_msg_sum_par = new HashMap<KafkaPartition, long[]>();
/**
* 1 min total byte number for each partition
*/
private final Map<KafkaPartition, long[]> _1_min_byte_sum_par = new HashMap<KafkaPartition, long[]>();
private static int cursor = 0;
/**
* total msg for each sec, msgSec[60] is total msg for a min
*/
private final long[] msgSec = new long[61];
/**
* total bytes for each sec, bytesSec[60] is total bytes for a min
*/
private final long[] bytesSec = new long[61];
private short last = 1;
private ScheduledExecutorService service;
public synchronized void moveNext()
{
cursor = (cursor + 1) % 60;
msgSec[60] -= msgSec[cursor];
bytesSec[60] -= bytesSec[cursor];
msgSec[cursor] = 0;
bytesSec[cursor] = 0;
for (Entry<KafkaPartition, long[]> item : _1_min_msg_sum_par.entrySet()) {
long[] msgv = item.getValue();
long[] bytesv = _1_min_byte_sum_par.get(item.getKey());
msgv[60] -= msgv[cursor];
bytesv[60] -= bytesv[cursor];
msgv[cursor] = 0;
bytesv[cursor] = 0;
}
}
public void start(){
if(service==null){
service = Executors.newScheduledThreadPool(1);
}
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run()
{
moveNext();
if(last<60)last++;
}
}, 1, 1, TimeUnit.SECONDS);
}
public void stop(){
if(service!=null){
service.shutdown();
}
}
public synchronized void mark(KafkaPartition partition, long bytes){
msgSec[cursor]++;
msgSec[60]++;
bytesSec[cursor] += bytes;
bytesSec[60] += bytes;
long[] msgv = _1_min_msg_sum_par.get(partition);
long[] bytev = _1_min_byte_sum_par.get(partition);
if(msgv == null){
msgv = new long[61];
bytev = new long[61];
_1_min_msg_sum_par.put(partition, msgv);
_1_min_byte_sum_par.put(partition, bytev);
}
msgv[cursor]++;
msgv[60]++;
bytev[cursor] += bytes;
bytev[60] += bytes;
}
public synchronized void setupStats(KafkaMeterStats stat){
long[] _1minAvg = {msgSec[60]/last, bytesSec[60]/last};
for (Entry<KafkaPartition, long[]> item : _1_min_msg_sum_par.entrySet()) {
long[] msgv =item.getValue();
long[] bytev = _1_min_byte_sum_par.get(item.getKey());
long[] _1minAvgPar = {msgv[60]/last, bytev[60]/last};
stat.set_1minMovingAvgPerPartition(item.getKey(), _1minAvgPar);
}
stat.set_1minMovingAvg(_1minAvg);
}
}
private SetMultimap<String, String> parseZookeeperStr(String zookeeper)
{
SetMultimap<String, String> theClusters = HashMultimap.create();
for (String zk : zookeeper.split(";")) {
String[] parts = zk.split("::");
String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0];
theClusters.put(clusterId, parts.length == 1 ? parts[0] : parts[1]);
}
return theClusters;
}
}