blob: 9bf0d200f93fc98c32794260e26c058bdba86fe4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import java.net.InetAddress
import java.util.UUID
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.client.ClientUtils
import kafka.cluster._
import kafka.common._
import kafka.javaapi.consumer.ConsumerRebalanceListener
import kafka.metrics._
import kafka.network.BlockingChannel
import kafka.serializer._
import kafka.utils.CoreUtils.inLock
import kafka.utils.ZkUtils._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
import scala.collection.JavaConverters._
/**
* This class handles the consumers interaction with zookeeper
*
* Directories:
* 1. Consumer id registry:
* /consumers/[group_id]/ids/[consumer_id] -> topic1,...topicN
* A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode
* and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone.
* A consumer subscribes to event changes of the consumer id registry within its group.
*
* The consumer id is picked up from configuration, instead of the sequential id assigned by ZK. Generated sequential
* ids are hard to recover during temporary connection loss to ZK, since it's difficult for the client to figure out
* whether the creation of a sequential znode has succeeded or not. More details can be found at
* (http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling)
*
* 2. Broker node registry:
* /brokers/[0...N] --> { "host" : "host:port",
* "topics" : {"topic1": ["partition1" ... "partitionN"], ...,
* "topicN": ["partition1" ... "partitionN"] } }
* This is a list of all present broker brokers. A unique logical node id is configured on each broker node. A broker
* node registers itself on start-up and creates a znode with the logical node id under /brokers. The value of the znode
* is a JSON String that contains (1) the host name and the port the broker is listening to, (2) a list of topics that
* the broker serves, (3) a list of logical partitions assigned to each topic on the broker.
* A consumer subscribes to event changes of the broker node registry.
*
* 3. Partition owner registry:
* /consumers/[group_id]/owner/[topic]/[broker_id-partition_id] --> consumer_node_id
* This stores the mapping before broker partitions and consumers. Each partition is owned by a unique consumer
* within a consumer group. The mapping is reestablished after each rebalancing.
*
* 4. Consumer offset tracking:
* /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value
* Each consumer tracks the offset of the latest message consumed for each partition.
*
*/
private[kafka] object ZookeeperConsumerConnector {
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
}
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
extends ConsumerConnector with Logging with KafkaMetricsGroup {
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkUtils: ZkUtils = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null
private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = null
private var offsetsChannel: BlockingChannel = null
private val offsetsChannelLock = new Object
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
private var consumerRebalanceListener: ConsumerRebalanceListener = null
// useful for tracking migration of consumers to store offsets in kafka
private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))
newGauge(
"yammer-metrics-count",
new Gauge[Int] {
def value = {
com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size()
}
}
)
val consumerIdString = {
var consumerUuid : String = null
config.consumerId match {
case Some(consumerId) // for testing only
=> consumerUuid = consumerId
case None // generate unique consumerId automatically
=> val uuid = UUID.randomUUID()
consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8))
}
config.groupId + "_" + consumerUuid
}
this.logIdent = "[" + consumerIdString + "], "
connectZk()
createFetcher()
ensureOffsetManagerConnected()
if (config.autoCommitEnable) {
scheduler.startup
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
scheduler.schedule("kafka-consumer-autocommit",
autoCommit,
delay = config.autoCommitIntervalMs,
period = config.autoCommitIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
KafkaMetricsReporter.startReporters(config.props)
AppInfo.registerInfo()
def this(config: ConsumerConfig) = this(config, true)
def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] =
createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String, List[KafkaStream[K,V]]] = {
if (messageStreamCreated.getAndSet(true))
throw new MessageStreamsExistException(this.getClass.getSimpleName +
" can create message streams at most once",null)
consume(topicCountMap, keyDecoder, valueDecoder)
}
def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
numStreams: Int,
keyDecoder: Decoder[K] = new DefaultDecoder(),
valueDecoder: Decoder[V] = new DefaultDecoder()) = {
val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder)
wildcardStreamsHandler.streams
}
def setConsumerRebalanceListener(listener: ConsumerRebalanceListener) {
if (messageStreamCreated.get())
throw new MessageStreamsExistException(this.getClass.getSimpleName +
" can only set consumer rebalance listener before creating streams",null)
consumerRebalanceListener = listener
}
private def createFetcher() {
if (enableFetcher)
fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkUtils))
}
private def connectZk() {
info("Connecting to zookeeper instance at " + config.zkConnect)
zkUtils = ZkUtils(config.zkConnect,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
JaasUtils.isZkSecurityEnabled())
}
// Blocks until the offset manager is located and a channel is established to it.
private def ensureOffsetManagerConnected() {
if (config.offsetsStorage == "kafka") {
if (offsetsChannel == null || !offsetsChannel.isConnected)
offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkUtils,
config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port))
}
}
def shutdown() {
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown) {
info("ZKConsumerConnector shutting down")
val startTime = System.nanoTime()
KafkaMetricsGroup.removeAllConsumerMetrics(config.clientId)
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
rebalanceLock synchronized {
try {
if (config.autoCommitEnable)
scheduler.shutdown()
fetcher match {
case Some(f) => f.stopConnections
case None =>
}
sendShutdownToAllQueues()
if (config.autoCommitEnable)
commitOffsets(true)
if (zkUtils != null) {
zkUtils.close()
zkUtils = null
}
if (offsetsChannel != null) offsetsChannel.disconnect()
} catch {
case e: Throwable =>
fatal("error during consumer connector shutdown", e)
}
info("ZKConsumerConnector shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
}
}
}
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}
// this API is used by unit tests only
def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
info("begin registering consumer " + consumerIdString + " in ZK")
val timestamp = Time.SYSTEM.milliseconds.toString
val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
"timestamp" -> timestamp))
val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo,
zkUtils.zkConnection.getZookeeper,
false)
zkWatchedEphemeral.create()
info("end registering consumer " + consumerIdString + " in ZK")
}
private def sendShutdownToAllQueues() = {
for (queue <- topicThreadIdAndQueues.values.toSet[BlockingQueue[FetchedDataChunk]]) {
debug("Clearing up queue")
queue.clear()
queue.put(ZookeeperConsumerConnector.shutdownCommand)
debug("Cleared queue and sent shutdown command")
}
}
def autoCommit() {
trace("auto committing")
try {
commitOffsets(isAutoCommit = false)
}
catch {
case t: Throwable =>
// log it and let it go
error("exception during autoCommit: ", t)
}
}
def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
if (checkpointedZkOffsets.get(topicPartition) != offset) {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
checkpointedZkOffsets.put(topicPartition, offset)
zkCommitMeter.mark()
}
}
/**
* KAFKA-1743: This method added for backward compatibility.
*/
def commitOffsets { commitOffsets(true) }
def commitOffsets(isAutoCommit: Boolean) {
val offsetsToCommit =
immutable.Map(topicRegistry.values.flatMap { partitionTopicInfos =>
partitionTopicInfos.values.map { info =>
TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
}
}.toSeq: _*)
commitOffsets(offsetsToCommit, isAutoCommit)
}
def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], isAutoCommit: Boolean) {
trace("OffsetMap: %s".format(offsetsToCommit))
var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries) // no retries for commits from auto-commit
var done = false
while (!done) {
val committed = offsetsChannelLock synchronized {
// committed when we receive either no error codes or only MetadataTooLarge errors
if (offsetsToCommit.size > 0) {
if (config.offsetsStorage == "zookeeper") {
offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) =>
commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
}
true
} else {
val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId)
ensureOffsetManagerConnected()
try {
kafkaCommitMeter.mark(offsetsToCommit.size)
offsetsChannel.send(offsetCommitRequest)
val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload())
trace("Offset commit response: %s.".format(offsetCommitResponse))
val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) =>
if (errorCode == Errors.NONE.code && config.dualCommitEnabled) {
val offset = offsetsToCommit(topicPartition).offset
commitOffsetToZooKeeper(topicPartition, offset)
}
(folded._1 || // update commitFailed
errorCode != Errors.NONE.code,
folded._2 || // update retryableIfFailed - (only metadata too large is not retryable)
(errorCode != Errors.NONE.code && errorCode != Errors.OFFSET_METADATA_TOO_LARGE.code),
folded._3 || // update shouldRefreshCoordinator
errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code ||
errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code,
// update error count
folded._4 + (if (errorCode != Errors.NONE.code) 1 else 0))
}
}
debug(errorCount + " errors in offset commit response.")
if (shouldRefreshCoordinator) {
debug("Could not commit offsets (because offset coordinator has moved or is unavailable).")
offsetsChannel.disconnect()
}
if (commitFailed && retryableIfFailed)
false
else
true
}
catch {
case t: Throwable =>
error("Error while committing offsets.", t)
offsetsChannel.disconnect()
false
}
}
} else {
debug("No updates to offsets since last commit.")
true
}
}
done = {
retriesRemaining -= 1
retriesRemaining == 0 || committed
}
if (!done) {
debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs))
Thread.sleep(config.offsetsChannelBackoffMs)
}
}
}
private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
offsetString match {
case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong))
case None => (topicPartition, OffsetMetadataAndError.NoOffset)
}
}
private def fetchOffsets(partitions: Seq[TopicAndPartition]) = {
if (partitions.isEmpty)
Some(OffsetFetchResponse(Map.empty))
else if (config.offsetsStorage == "zookeeper") {
val offsets = partitions.map(fetchOffsetFromZooKeeper)
Some(OffsetFetchResponse(immutable.Map(offsets:_*)))
} else {
val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = partitions, clientId = config.clientId)
var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None
while (!isShuttingDown.get && !offsetFetchResponseOpt.isDefined) {
offsetFetchResponseOpt = offsetsChannelLock synchronized {
ensureOffsetManagerConnected()
try {
offsetsChannel.send(offsetFetchRequest)
val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().payload())
trace("Offset fetch response: %s.".format(offsetFetchResponse))
val (leaderChanged, loadInProgress) =
offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
(folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
}
if (leaderChanged) {
offsetsChannel.disconnect()
debug("Could not fetch offsets (because offset manager has moved).")
None // retry
}
else if (loadInProgress) {
debug("Could not fetch offsets (because offset cache is being loaded).")
None // retry
}
else {
if (config.dualCommitEnabled) {
// if dual-commit is enabled (i.e., if a consumer group is migrating offsets to kafka), then pick the
// maximum between offsets in zookeeper and kafka.
val kafkaOffsets = offsetFetchResponse.requestInfo
val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) =>
val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset
val mostRecentOffset = zkOffset.max(kafkaOffset.offset)
(topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, Errors.NONE.code))
}
Some(OffsetFetchResponse(mostRecentOffsets))
}
else
Some(offsetFetchResponse)
}
}
catch {
case e: Exception =>
warn("Error while fetching offsets from %s:%d. Possible cause: %s".format(offsetsChannel.host, offsetsChannel.port, e.getMessage))
offsetsChannel.disconnect()
None // retry
}
}
if (offsetFetchResponseOpt.isEmpty) {
debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs))
Thread.sleep(config.offsetsChannelBackoffMs)
}
}
offsetFetchResponseOpt
}
}
class ZKSessionExpireListener(val dirs: ZKGroupDirs,
val consumerIdString: String,
val topicCount: TopicCount,
val loadBalancerListener: ZKRebalancerListener)
extends IZkStateListener {
@throws[Exception]
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws[Exception]
def handleNewSession() {
/**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
* connection for us. We need to release the ownership of the current consumer and re-register this
* consumer in the consumer registry and trigger a rebalance.
*/
info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
loadBalancerListener.resetState()
registerConsumerInZK(dirs, consumerIdString, topicCount)
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
// There is no need to resubscribe to child and state changes.
// The child change watchers will be set inside rebalance when we read the children list.
}
override def handleSessionEstablishmentError(error: Throwable): Unit = {
fatal("Could not establish session with zookeeper", error)
}
}
class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener)
extends IZkDataListener {
def handleDataChange(dataPath : String, data: Object) {
try {
info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance")
// queue up the rebalance event
loadBalancerListener.rebalanceEventTriggered()
// There is no need to re-subscribe the watcher since it will be automatically
// re-registered upon firing of this event by zkClient
} catch {
case e: Throwable => error("Error while handling topic partition change for data path " + dataPath, e )
}
}
@throws[Exception]
def handleDataDeleted(dataPath : String) {
// TODO: This need to be implemented when we support delete topic
warn("Topic for path " + dataPath + " gets deleted, which should not happen at this time")
}
}
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
private val partitionAssignor = PartitionAssignor.createInstance(config.partitionAssignmentStrategy)
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
@volatile private var allTopicsOwnedPartitionsCount = 0
newGauge("OwnedPartitionsCount",
new Gauge[Int] {
def value() = allTopicsOwnedPartitionsCount
},
Map("clientId" -> config.clientId, "groupId" -> config.groupId))
private def ownedPartitionsCountMetricTags(topic: String) = Map("clientId" -> config.clientId, "groupId" -> config.groupId, "topic" -> topic)
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
info("starting watcher executor thread for consumer " + consumerIdString)
var doRebalance = false
while (!isShuttingDown.get) {
try {
lock.lock()
try {
if (!isWatcherTriggered)
cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
} finally {
doRebalance = isWatcherTriggered
isWatcherTriggered = false
lock.unlock()
}
if (doRebalance)
syncedRebalance
} catch {
case t: Throwable => error("error during syncedRebalance", t)
}
}
info("stopping watcher executor thread for consumer " + consumerIdString)
}
}
watcherExecutorThread.start()
@throws[Exception]
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
rebalanceEventTriggered()
}
def rebalanceEventTriggered() {
inLock(lock) {
isWatcherTriggered = true
cond.signalAll()
}
}
private def deletePartitionOwnershipFromZK(topic: String, partition: Int) {
val topicDirs = new ZKGroupTopicDirs(group, topic)
val znode = topicDirs.consumerOwnerDir + "/" + partition
zkUtils.deletePath(znode)
debug("Consumer " + consumerIdString + " releasing " + znode)
}
private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= {
info("Releasing partition ownership")
for ((topic, infos) <- localTopicRegistry) {
for(partition <- infos.keys) {
deletePartitionOwnershipFromZK(topic, partition)
}
removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricTags(topic))
localTopicRegistry.remove(topic)
}
allTopicsOwnedPartitionsCount = 0
}
def resetState() {
topicRegistry.clear
}
def syncedRebalance() {
rebalanceLock synchronized {
rebalanceTimer.time {
for (i <- 0 until config.rebalanceMaxRetries) {
if(isShuttingDown.get()) {
return
}
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
var cluster: Cluster = null
try {
cluster = zkUtils.getCluster()
done = rebalance(cluster)
} catch {
case e: Throwable =>
/** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
* For example, a ZK node can disappear between the time we get all children and the time we try to get
* the value of a child. Just let this go since another rebalance will be triggered.
**/
info("exception during rebalance ", e)
}
info("end rebalancing consumer " + consumerIdString + " try #" + i)
if (done) {
return
} else {
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
}
// stop all fetchers and clear all the queues to avoid data duplication
closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
Thread.sleep(config.rebalanceBackoffMs)
}
}
}
throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
}
private def rebalance(cluster: Cluster): Boolean = {
val myTopicThreadIdsMap = TopicCount.constructTopicCount(
group, consumerIdString, zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
val brokers = zkUtils.getAllBrokersInCluster()
if (brokers.size == 0) {
// This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
// We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
// are up.
warn("no brokers found when trying to rebalance.")
zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
true
}
else {
/**
* fetchers must be stopped to avoid data duplication, since if the current
* rebalancing attempt fails, the partitions that are released could be owned by another consumer.
* But if we don't stop the fetchers first, this consumer would continue returning data for released
* partitions in parallel. So, not stopping the fetchers leads to duplicate data.
*/
closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
if (consumerRebalanceListener != null) {
info("Invoking rebalance listener before relasing partition ownerships.")
consumerRebalanceListener.beforeReleasingPartitions(
if (topicRegistry.size == 0)
new java.util.HashMap[String, java.util.Set[java.lang.Integer]]
else
topicRegistry.map(topics =>
topics._1 -> topics._2.keys // note this is incorrect, see KAFKA-2284
).toMap.asJava.asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]]
)
}
releasePartitionOwnership(topicRegistry)
val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkUtils)
val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
valueFactory = Some((_: String) => new Pool[Int, PartitionTopicInfo]))
// fetch current offsets for all topic-partitions
val topicPartitions = partitionAssignment.keySet.toSeq
val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
false
else {
val offsetFetchResponse = offsetFetchResponseOpt.get
topicPartitions.foreach(topicAndPartition => {
val (topic, partition) = topicAndPartition.asTuple
val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
val threadId = partitionAssignment(topicAndPartition)
addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
})
/**
* move the partition ownership here, since that can be used to indicate a truly successful re-balancing attempt
* A rebalancing attempt is completed successfully only after the fetchers have been started correctly
*/
if(reflectPartitionOwnershipDecision(partitionAssignment)) {
allTopicsOwnedPartitionsCount = partitionAssignment.size
partitionAssignment.view.groupBy { case (topicPartition, _) => topicPartition.topic }
.foreach { case (topic, partitionThreadPairs) =>
newGauge("OwnedPartitionsCount",
new Gauge[Int] {
def value() = partitionThreadPairs.size
},
ownedPartitionsCountMetricTags(topic))
}
topicRegistry = currentTopicRegistry
// Invoke beforeStartingFetchers callback if the consumerRebalanceListener is set.
if (consumerRebalanceListener != null) {
info("Invoking rebalance listener before starting fetchers.")
// Partition assignor returns the global partition assignment organized as a map of [TopicPartition, ThreadId]
// per consumer, and we need to re-organize it to a map of [Partition, ThreadId] per topic before passing
// to the rebalance callback.
val partitionAssginmentGroupByTopic = globalPartitionAssignment.values.flatten.groupBy[String] {
case (topicPartition, _) => topicPartition.topic
}
val partitionAssigmentMapForCallback = partitionAssginmentGroupByTopic.map({
case (topic, partitionOwnerShips) =>
val partitionOwnershipForTopicScalaMap = partitionOwnerShips.map({
case (topicAndPartition, consumerThreadId) =>
(topicAndPartition.partition: Integer) -> consumerThreadId
}).toMap
topic -> partitionOwnershipForTopicScalaMap.asJava
})
consumerRebalanceListener.beforeStartingFetchers(
consumerIdString,
partitionAssigmentMapForCallback.asJava
)
}
updateFetcher(cluster)
true
} else {
false
}
}
}
}
private def closeFetchersForQueues(cluster: Cluster,
messageStreams: Map[String,List[KafkaStream[_,_]]],
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
case Some(f) =>
f.stopConnections
clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
/**
* here, we need to commit offsets before stopping the consumer from returning any more messages
* from the current data chunk. Since partition ownership is not yet released, this commit offsets
* call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
* for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
* by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
* successfully and the fetchers restart to fetch more data chunks
**/
if (config.autoCommitEnable) {
info("Committing all offsets after clearing the fetcher queues")
commitOffsets(true)
}
case None =>
}
}
private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
messageStreams: Map[String,List[KafkaStream[_,_]]]) {
// Clear all but the currently iterated upon chunk in the consumer thread's queue
queuesTobeCleared.foreach(_.clear)
info("Cleared all relevant queues for this fetcher")
// Also clear the currently iterated upon chunk in the consumer threads
if(messageStreams != null)
messageStreams.foreach(_._2.foreach(s => s.clear()))
info("Cleared the data chunks in all the consumer message iterators")
}
private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]],
relevantTopicThreadIdsMap: Map[String, Set[ConsumerThreadId]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
}
private def updateFetcher(cluster: Cluster) {
// update partitions for fetcher
var allPartitionInfos : List[PartitionTopicInfo] = Nil
for (partitionInfos <- topicRegistry.values)
for (partition <- partitionInfos.values)
allPartitionInfos ::= partition
info("Consumer " + consumerIdString + " selected partitions : " +
allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
fetcher match {
case Some(f) =>
f.startConnections(allPartitionInfos, cluster)
case None =>
}
}
private def reflectPartitionOwnershipDecision(partitionAssignment: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {
var successfullyOwnedPartitions : List[(String, Int)] = Nil
val partitionOwnershipSuccessful = partitionAssignment.map { partitionOwner =>
val topic = partitionOwner._1.topic
val partition = partitionOwner._1.partition
val consumerThreadId = partitionOwner._2
val partitionOwnerPath = zkUtils.getConsumerPartitionOwnerPath(group, topic, partition)
try {
zkUtils.createEphemeralPathExpectConflict(partitionOwnerPath, consumerThreadId.toString)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
successfullyOwnedPartitions ::= (topic, partition)
true
} catch {
case _: ZkNodeExistsException =>
// The node hasn't been deleted by the original owner. So wait a bit and retry.
info("waiting for the partition ownership to be deleted: " + partition + " for topic " + topic)
false
}
}
val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
/* even if one of the partition ownership attempt has failed, return false */
if(hasPartitionOwnershipFailed > 0) {
// remove all paths that we have owned in ZK
successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2))
false
}
else true
}
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
partition: Int, topic: String,
offset: Long, consumerThreadId: ConsumerThreadId) {
val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic)
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
partition,
queue,
consumedOffset,
fetchedOffset,
new AtomicInteger(config.fetchMessageMaxBytes),
config.clientId)
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
}
}
private def reinitializeConsumer[K,V](
topicCount: TopicCount,
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
val dirs = new ZKGroupDirs(config.groupId)
// listener to consumer and partition changes
if (loadBalancerListener == null) {
val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
loadBalancerListener = new ZKRebalancerListener(
config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
}
// create listener for session expired event if not exist yet
if (sessionExpirationListener == null)
sessionExpirationListener = new ZKSessionExpireListener(
dirs, consumerIdString, topicCount, loadBalancerListener)
// create listener for topic partition change event if not exist yet
if (topicPartitionChangeListener == null)
topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
// map of {topic -> Set(thread-1, thread-2, ...)}
val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
topicCount.getConsumerThreadIdsPerTopic
val allQueuesAndStreams = topicCount match {
case _: WildcardTopicCount =>
/*
* Wild-card consumption streams share the same queues, so we need to
* duplicate the list for the subsequent zip operation.
*/
(1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
case _: StaticTopicCount =>
queuesAndStreams
}
val topicThreadIds = consumerThreadIdsPerTopic.map { case (topic, threadIds) =>
threadIds.map((topic, _))
}.flatten
require(topicThreadIds.size == allQueuesAndStreams.size,
"Mismatch between thread ID count (%d) and queue count (%d)"
.format(topicThreadIds.size, allQueuesAndStreams.size))
val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)
threadQueueStreamPairs.foreach(e => {
val topicThreadId = e._1
val q = e._2._1
topicThreadIdAndQueues.put(topicThreadId, q)
debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
newGauge(
"FetchQueueSize",
new Gauge[Int] {
def value = q.size
},
Map("clientId" -> config.clientId,
"topic" -> topicThreadId._1,
"threadId" -> topicThreadId._2.threadId.toString)
)
})
val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
groupedByTopic.foreach(e => {
val topic = e._1
val streams = e._2.map(_._2._2).toList
topicStreamsMap += (topic -> streams)
debug("adding topic %s and %d streams to map.".format(topic, streams.size))
})
// listener to consumer and partition changes
zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
topicStreamsMap.foreach { topicAndStreams =>
// register on broker partition path changes
val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
zkUtils.zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
}
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
}
class WildcardStreamsHandler[K,V](topicFilter: TopicFilter,
numStreams: Int,
keyDecoder: Decoder[K],
valueDecoder: Decoder[V])
extends TopicEventHandler[String] {
if (messageStreamCreated.getAndSet(true))
throw new RuntimeException("Each consumer connector can create " +
"message streams by filter at most once.")
private val wildcardQueuesAndStreams = (1 to numStreams)
.map(_ => {
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](queue,
config.consumerTimeoutMs,
keyDecoder,
valueDecoder,
config.clientId)
(queue, stream)
}).toList
// bootstrap with existing topics
private var wildcardTopics =
zkUtils.getChildrenParentMayNotExist(BrokerTopicsPath)
.filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
private val wildcardTopicCount = TopicCount.constructTopicCount(
consumerIdString, topicFilter, numStreams, zkUtils, config.excludeInternalTopics)
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
/*
* Topic events will trigger subsequent synced rebalances.
*/
info("Creating topic event watcher for topics " + topicFilter)
wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkUtils, this)
def handleTopicEvent(allTopics: Seq[String]) {
debug("Handling topic event")
val updatedTopics = allTopics.filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
val addedTopics = updatedTopics filterNot (wildcardTopics contains)
if (addedTopics.nonEmpty)
info("Topic event: added topics = %s"
.format(addedTopics))
/*
* TODO: Deleted topics are interesting (and will not be a concern until
* 0.8 release). We may need to remove these topics from the rebalance
* listener's map in reinitializeConsumer.
*/
val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
if (deletedTopics.nonEmpty)
info("Topic event: deleted topics = %s"
.format(deletedTopics))
wildcardTopics = updatedTopics
info("Topics to consume = %s".format(wildcardTopics))
if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
}
def streams: Seq[KafkaStream[K,V]] =
wildcardQueuesAndStreams.map(_._2)
}
}