blob: eb035f2674708de5b9dc13032d228d3b0260f479 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import scala.collection._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, CoreUtils}
import kafka.common.KafkaException
private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]]
def getTopicCountMap: Map[String, Int]
def pattern: String
}
case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
override def toString = "%s-%d".format(consumer, threadId)
def compare(that: ConsumerThreadId) = toString.compare(that.toString)
}
private[kafka] object TopicCount extends Logging {
val whiteListPattern = "white_list"
val blackListPattern = "black_list"
val staticPattern = "static"
def makeThreadId(consumerIdString: String, threadId: Int) = consumerIdString + "-" + threadId
def makeConsumerThreadIdsPerTopic(consumerIdString: String,
topicCountMap: Map[String, Int]) = {
val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
for ((topic, nConsumers) <- topicCountMap) {
val consumerSet = new mutable.HashSet[ConsumerThreadId]
assert(nConsumers >= 1)
for (i <- 0 until nConsumers)
consumerSet += ConsumerThreadId(consumerIdString, i)
consumerThreadIdsPerTopicMap.put(topic, consumerSet)
}
consumerThreadIdsPerTopicMap
}
def constructTopicCount(group: String, consumerId: String, zkUtils: ZkUtils, excludeInternalTopics: Boolean) : TopicCount = {
val dirs = new ZKGroupDirs(group)
val topicCountString = zkUtils.readData(dirs.consumerRegistryDir + "/" + consumerId)._1
var subscriptionPattern: String = null
var topMap: Map[String, Int] = null
try {
Json.parseFull(topicCountString) match {
case Some(m) =>
val consumerRegistrationMap = m.asInstanceOf[Map[String, Any]]
consumerRegistrationMap.get("pattern") match {
case Some(pattern) => subscriptionPattern = pattern.asInstanceOf[String]
case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
}
consumerRegistrationMap.get("subscription") match {
case Some(sub) => topMap = sub.asInstanceOf[Map[String, Int]]
case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
}
case None => throw new KafkaException("error constructing TopicCount : " + topicCountString)
}
} catch {
case e: Throwable =>
error("error parsing consumer json string " + topicCountString, e)
throw e
}
val hasWhiteList = whiteListPattern.equals(subscriptionPattern)
val hasBlackList = blackListPattern.equals(subscriptionPattern)
if (topMap.isEmpty || !(hasWhiteList || hasBlackList)) {
new StaticTopicCount(consumerId, topMap)
} else {
val regex = topMap.head._1
val numStreams = topMap.head._2
val filter =
if (hasWhiteList)
new Whitelist(regex)
else
new Blacklist(regex)
new WildcardTopicCount(zkUtils, consumerId, filter, numStreams, excludeInternalTopics)
}
}
def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
new StaticTopicCount(consumerIdString, topicCount)
def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkUtils: ZkUtils, excludeInternalTopics: Boolean) =
new WildcardTopicCount(zkUtils, consumerIdString, filter, numStreams, excludeInternalTopics)
}
private[kafka] class StaticTopicCount(val consumerIdString: String,
val topicCountMap: Map[String, Int])
extends TopicCount {
def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
override def equals(obj: Any): Boolean = {
obj match {
case null => false
case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
case _ => false
}
}
def getTopicCountMap = topicCountMap
def pattern = TopicCount.staticPattern
}
private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
consumerIdString: String,
topicFilter: TopicFilter,
numStreams: Int,
excludeInternalTopics: Boolean) extends TopicCount {
def getConsumerThreadIdsPerTopic = {
val wildcardTopics = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath)
.filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
}
def getTopicCountMap = Map(CoreUtils.JSONEscapeString(topicFilter.regex) -> numStreams)
def pattern: String = {
topicFilter match {
case _: Whitelist => TopicCount.whiteListPattern
case _: Blacklist => TopicCount.blackListPattern
}
}
}