blob: f7782df231b3f4e25f71b343ec5b3c2ae4a42ce3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
import locks.ReentrantLock
import scala.collection._
import kafka.cluster._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.api.OffsetRequest
import java.util.UUID
import kafka.serializer.Decoder
import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
import java.lang.IllegalStateException
import kafka.utils.ZkUtils._
/**
* This class handles the consumers interaction with zookeeper
*
* Directories:
* 1. Consumer id registry:
* /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN
* A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode
* and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone.
* A consumer subscribes to event changes of the consumer id registry within its group.
*
* The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential
* ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out
* whether the creation of a sequential znode has succeeded or not. More details can be found at
* (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling)
*
* 2. Broker node registry:
* /brokers/[0...N] --> { "host" : "host:port",
* "topics" : {"topic1": ["partition1" ... "partitionN"], ...,
* "topicN": ["partition1" ... "partitionN"] } }
* This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker
* node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode
* is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that
* the broker serves, (3) a list of logical partitions assigned to each topic on the broker.
* A consumer subscribes to event changes of the broker node registry.
*
* 3. Partition owner registry:
* /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id
* This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer
* within a consumer group. The mapping is reestablished after each rebalancing.
*
* 4. Consumer offset tracking:
* /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
* Each consumer tracks the offset of the latest message consumed for each partition.
*
*/
private[kafka] object ZookeeperConsumerConnector {
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
}
/**
* JMX interface for monitoring consumer
*/
trait ZookeeperConsumerConnectorMBean {
def getPartOwnerStats: String
def getConsumerGroup: String
def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
}
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
extends ConsumerConnector with ZookeeperConsumerConnectorMBean
with Logging {
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[Fetcher] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
// topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null
private var loadBalancerListener: ZKRebalancerListener = null
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
val consumerIdString = {
var consumerUuid : String = null
config.consumerId match {
case Some(consumerId) // for testing only
=> consumerUuid = consumerId
case None // generate unique consumerId automatically
=> val uuid = UUID.randomUUID()
consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8))
}
config.groupId + "_" + consumerUuid
}
this.logIdent = consumerIdString + " "
connectZk()
createFetcher()
if (config.autoCommit) {
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
}
def this(config: ConsumerConfig) = this(config, true)
def createMessageStreams[T](topicCountMap: Map[String,Int],
decoder: Decoder[T])
: Map[String,List[KafkaStream[T]]] = {
if (messageStreamCreated.getAndSet(true))
throw new RuntimeException(this.getClass.getSimpleName +
" can create message streams at most once")
consume(topicCountMap, decoder)
}
def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
wildcardStreamsHandler.streams
}
private def createFetcher() {
if (enableFetcher)
fetcher = Some(new Fetcher(config, zkClient))
}
private def connectZk() {
info("Connecting to zookeeper instance at " + config.zkConnect)
zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
}
def shutdown() {
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("ZKConsumerConnector shutting down")
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
try {
scheduler.shutdownNow()
fetcher match {
case Some(f) => f.stopConnectionsToAllBrokers
case None =>
}
sendShutdownToAllQueues()
if (config.autoCommit)
commitOffsets()
if (zkClient != null) {
zkClient.close()
zkClient = null
}
}
catch {
case e =>
fatal("error during consumer connector shutdown", e)
}
info("ZKConsumerConnector shut down completed")
}
}
def consume[T](topicCountMap: scala.collection.Map[String,Int],
decoder: Decoder[T])
: Map[String,List[KafkaStream[T]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val stream = new KafkaStream[T](
queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
(queue, stream)
})
).flatten.toList
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
}
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
info("begin registering consumer " + consumerIdString + " in ZK")
createEphemeralPathExpectConflict(zkClient,
dirs.consumerRegistryDir + "/" + consumerIdString,
topicCount.dbString)
info("end registering consumer " + consumerIdString + " in ZK")
}
private def sendShutdownToAllQueues() = {
for (queue <- topicThreadIdAndQueues.values) {
debug("Clearing up queue")
queue.clear()
queue.put(ZookeeperConsumerConnector.shutdownCommand)
debug("Cleared queue and sent shutdown command")
}
}
def autoCommit() {
trace("auto committing")
try {
commitOffsets()
}
catch {
case t: Throwable =>
// log it and let it go
error("exception during autoCommit: ", t)
}
}
def commitOffsets() {
if (zkClient == null) {
error("zk client is null. Cannot commit offsets")
return
}
for ((topic, infos) <- topicRegistry) {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
for (info <- infos.values) {
val newOffset = info.getConsumeOffset
try {
updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
newOffset.toString)
}
catch {
case t: Throwable =>
// log it and let it go
warn("exception during commitOffsets", t)
}
debug("Committed offset " + newOffset + " for topic " + info)
}
}
}
// for JMX
def getPartOwnerStats(): String = {
val builder = new StringBuilder
for ((topic, infos) <- topicRegistry) {
builder.append("\n" + topic + ": [")
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
for(partition <- infos.values) {
builder.append("\n {")
builder.append{partition.partition.name}
builder.append(",fetch offset:" + partition.getFetchOffset)
builder.append(",consumer offset:" + partition.getConsumeOffset)
builder.append("}")
}
builder.append("\n ]")
}
builder.toString
}
// for JMX
def getConsumerGroup(): String = config.groupId
def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long =
getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
val partition = new Partition(brokerId, partitionId)
val partitionInfos = topicRegistry.get(topic)
if (partitionInfos != null) {
val partitionInfo = partitionInfos.get(partition)
if (partitionInfo != null)
return partitionInfo.getConsumeOffset
}
//otherwise, try to get it from zookeeper
try {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
val offsetString = readDataMaybeNull(zkClient, znode)
if (offsetString != null)
return offsetString.toLong
else
return -1
}
catch {
case e =>
error("error in getConsumedOffset JMX ", e)
}
return -2
}
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime)
private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = {
var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L
try {
val cluster = getCluster(zkClient)
val broker = cluster.getBroker(brokerId) match {
case Some(b) => b
case None => throw new IllegalStateException("Broker " + brokerId + " is unavailable. Cannot issue " +
"getOffsetsBefore request")
}
simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize)
val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
producedOffset = offsets(0)
}
catch {
case e =>
error("error in earliestOrLatestOffset() ", e)
}
finally {
if (simpleConsumer != null)
simpleConsumer.close
}
producedOffset
}
class ZKSessionExpireListener(val dirs: ZKGroupDirs,
val consumerIdString: String,
val topicCount: TopicCount,
val loadBalancerListener: ZKRebalancerListener)
extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
/**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
* connection for us. We need to release the ownership of the current consumer and re-register this
* consumer in the consumer registry and trigger a rebalance.
*/
info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
loadBalancerListener.resetState()
registerConsumerInZK(dirs, consumerIdString, topicCount)
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
// There is no need to resubscribe to child and state changes.
// The child change watchers will be set inside rebalance when we read the children list.
}
}
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
extends IZkChildListener {
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
info("starting watcher executor thread for consumer " + consumerIdString)
var doRebalance = false
while (!isShuttingDown.get) {
try {
lock.lock()
try {
if (!isWatcherTriggered)
cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
} finally {
doRebalance = isWatcherTriggered
isWatcherTriggered = false
lock.unlock()
}
if (doRebalance)
syncedRebalance
} catch {
case t => error("error during syncedRebalance", t)
}
}
info("stopping watcher executor thread for consumer " + consumerIdString)
}
}
watcherExecutorThread.start()
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
lock.lock()
try {
isWatcherTriggered = true
cond.signalAll()
} finally {
lock.unlock()
}
}
private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
val topicDirs = new ZKGroupTopicDirs(group, topic)
val znode = topicDirs.consumerOwnerDir + "/" + partition
deletePath(zkClient, znode)
debug("Consumer " + consumerIdString + " releasing " + znode)
}
private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]])= {
info("Releasing partition ownership")
for ((topic, infos) <- localTopicRegistry) {
for(partition <- infos.keys)
deletePartitionOwnershipFromZK(topic, partition.toString)
localTopicRegistry.remove(topic)
}
}
def resetState() {
topicRegistry.clear
}
def syncedRebalance() {
rebalanceLock synchronized {
for (i <- 0 until config.maxRebalanceRetries) {
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
val cluster = getCluster(zkClient)
try {
done = rebalance(cluster)
}
catch {
case e =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered.
**/
info("exception during rebalance ", e)
}
info("end rebalancing consumer " + consumerIdString + " try #" + i)
if (done) {
return
}else {
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
}
// stop all fetchers and clear all the queues to avoid data duplication
closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
Thread.sleep(config.rebalanceBackoffMs)
}
}
throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
}
private def rebalance(cluster: Cluster): Boolean = {
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
/**
* fetchers must be stopped to avoid data duplication, since if the current
* rebalancing attempt fails, the partitions that are released could be owned by another consumer.
* But if we don't stop the fetchers first, this consumer would continue returning data for released
* partitions in parallel. So, not stopping the fetchers leads to duplicate data.
*/
closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
releasePartitionOwnership(topicRegistry)
var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
var currentTopicRegistry = new Pool[String, Pool[Partition, PartitionTopicInfo]]
for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
currentTopicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
val topicDirs = new ZKGroupTopicDirs(group, topic)
val curConsumers = consumersPerTopicMap.get(topic).get
var curPartitions: List[String] = partitionsPerTopicMap.get(topic).get
val nPartsPerConsumer = curPartitions.size / curConsumers.size
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions +
" for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) {
val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
/**
* Range-partition the sorted partitions to consumers for better locality.
* The first few consumers pick up an extra partition, if any.
*/
if (nParts <= 0)
warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
else {
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
// record the partition ownership decision
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
}
}
}
/**
* move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
* A rebalancing attempt is completed successfully only after the fetchers have been started correctly
*/
if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
info("Updating the cache")
debug("Partitions per topic cache " + partitionsPerTopicMap)
debug("Consumers per topic cache " + consumersPerTopicMap)
topicRegistry = currentTopicRegistry
updateFetcher(cluster)
true
}else {
false
}
}
private def closeFetchersForQueues(cluster: Cluster,
messageStreams: Map[String,List[KafkaStream[_]]],
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
case Some(f) => f.stopConnectionsToAllBrokers
f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
info("Committing all offsets after clearing the fetcher queues")
/**
* here, we need to commit offsets before stopping the consumer from returning any more messages
* from the current data chunk. Since partition ownership is not yet released, this commit offsets
* call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
* for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
* by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
* successfully and the fetchers restart to fetch more data chunks
**/
commitOffsets
case None =>
}
}
private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
relevantTopicThreadIdsMap: Map[String, Set[String]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
}
private def updateFetcher(cluster: Cluster) {
// update partitions for fetcher
var allPartitionInfos : List[PartitionTopicInfo] = Nil
for (partitionInfos <- topicRegistry.values)
for (partition <- partitionInfos.values)
allPartitionInfos ::= partition
info("Consumer " + consumerIdString + " selected partitions : " +
allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))
fetcher match {
case Some(f) =>
f.startConnections(allPartitionInfos, cluster)
case None =>
}
}
private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
var successfullyOwnedPartitions : List[(String, String)] = Nil
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
val topic = partitionOwner._1._1
val partition = partitionOwner._1._2
val consumerThreadId = partitionOwner._2
val topicDirs = new ZKGroupTopicDirs(group, topic)
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
try {
createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
successfullyOwnedPartitions ::= (topic, partition)
true
}
catch {
case e: ZkNodeExistsException =>
// The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition)
false
case e2 => throw e2
}
}
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
/* even if one of the partition ownership attempt has failed, return false */
if(hasPartitionOwnershipFailed > 0) {
// remove all paths that we have owned in ZK
successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2))
false
}
else true
}
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]],
topicDirs: ZKGroupTopicDirs, partitionString: String,
topic: String, consumerThreadId: String) {
val partition = Partition.parse(partitionString)
val partTopicInfoMap = currentTopicRegistry.get(topic)
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
val offsetString = readDataMaybeNull(zkClient, znode)
// If first time starting a consumer, set the initial offset based on the config
var offset : Long = 0L
if (offsetString == null)
offset = config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.EarliestTime)
case OffsetRequest.LargestTimeString =>
earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.LatestTime)
case _ =>
throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
}
else
offset = offsetString.toLong
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
partition.brokerId,
partition,
queue,
consumedOffset,
fetchedOffset,
new AtomicInteger(config.fetchSize))
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
}
}
private def reinitializeConsumer[T](
topicCount: TopicCount,
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
val dirs = new ZKGroupDirs(config.groupId)
// listener to consumer and partition changes
if (loadBalancerListener == null) {
val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
loadBalancerListener = new ZKRebalancerListener(
config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
}
// register listener for session expired event
if (sessionExpirationListener == null)
sessionExpirationListener = new ZKSessionExpireListener(
dirs, consumerIdString, topicCount, loadBalancerListener)
val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
// map of {topic -> Set(thread-1, thread-2, ...)}
val consumerThreadIdsPerTopic: Map[String, Set[String]] =
topicCount.getConsumerThreadIdsPerTopic
/*
* This usage of map flatten breaks up consumerThreadIdsPerTopic into
* a set of (topic, thread-id) pairs that we then use to construct
* the updated (topic, thread-id) -> queues map
*/
implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _))
// iterator over (topic, thread-id) tuples
val topicThreadIds: Iterable[(String, String)] =
consumerThreadIdsPerTopic.flatten
// list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
val threadQueueStreamPairs = topicCount match {
case wildTopicCount: WildcardTopicCount =>
for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
case statTopicCount: StaticTopicCount => {
require(topicThreadIds.size == queuesAndStreams.size,
"Mismatch between thread ID count (%d) and queue count (%d)".format(
topicThreadIds.size, queuesAndStreams.size))
topicThreadIds.zip(queuesAndStreams)
}
}
threadQueueStreamPairs.foreach(e => {
val topicThreadId = e._1
val q = e._2._1
topicThreadIdAndQueues.put(topicThreadId, q)
})
val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
groupedByTopic.foreach(e => {
val topic = e._1
val streams = e._2.map(_._2._2).toList
topicStreamsMap += (topic -> streams)
debug("adding topic %s and %d streams to map.".format(topic, streams.size))
})
// listener to consumer and partition changes
zkClient.subscribeStateChanges(sessionExpirationListener)
zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
topicStreamsMap.foreach { topicAndStreams =>
// register on broker partition path changes
val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
}
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
}
class WildcardStreamsHandler[T](topicFilter: TopicFilter,
numStreams: Int,
decoder: Decoder[T])
extends TopicEventHandler[String] {
if (messageStreamCreated.getAndSet(true))
throw new RuntimeException("Each consumer connector can create " +
"message streams by filter at most once.")
private val wildcardQueuesAndStreams = (1 to numStreams)
.map(e => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
val stream = new KafkaStream[T](
queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
(queue, stream)
}).toList
// bootstrap with existing topics
private var wildcardTopics =
getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
.filter(topicFilter.isTopicAllowed)
private val wildcardTopicCount = TopicCount.constructTopicCount(
consumerIdString, topicFilter, numStreams, zkClient)
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
if (!topicFilter.requiresTopicEventWatcher) {
info("Not creating event watcher for trivial whitelist " + topicFilter)
}
else {
info("Creating topic event watcher for whitelist " + topicFilter)
wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
/*
* Topic events will trigger subsequent synced rebalances. Also, the
* consumer will get registered only after an allowed topic becomes
* available.
*/
}
def handleTopicEvent(allTopics: Seq[String]) {
debug("Handling topic event")
val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
val addedTopics = updatedTopics filterNot (wildcardTopics contains)
if (addedTopics.nonEmpty)
info("Topic event: added topics = %s"
.format(addedTopics))
/*
* TODO: Deleted topics are interesting (and will not be a concern until
* 0.8 release). We may need to remove these topics from the rebalance
* listener's map in reinitializeConsumer.
*/
val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
if (deletedTopics.nonEmpty)
info("Topic event: deleted topics = %s"
.format(deletedTopics))
wildcardTopics = updatedTopics
info("Topics to consume = %s".format(wildcardTopics))
if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
}
def streams: Seq[KafkaStream[T]] =
wildcardQueuesAndStreams.map(_._2)
}
}