blob: 2ef13d49c8ae99dc0b0831d137517d627293e029 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import scala.collection._
import org.I0Itec.zkclient.ZkClient
import java.util.regex.Pattern
import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging}
private[kafka] trait TopicCount {
def getConsumerThreadIdsPerTopic: Map[String, Set[String]]
def dbString: String
protected def makeConsumerThreadIdsPerTopic(consumerIdString: String,
topicCountMap: Map[String, Int]) = {
val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]()
for ((topic, nConsumers) <- topicCountMap) {
val consumerSet = new mutable.HashSet[String]
assert(nConsumers >= 1)
for (i <- 0 until nConsumers)
consumerSet += consumerIdString + "-" + i
consumerThreadIdsPerTopicMap.put(topic, consumerSet)
}
consumerThreadIdsPerTopicMap
}
}
private[kafka] object TopicCount extends Logging {
/*
* Example of whitelist topic count stored in ZooKeeper:
* Topics with whitetopic as prefix, and four streams: *4*whitetopic.*
*
* Example of blacklist topic count stored in ZooKeeper:
* Topics with blacktopic as prefix, and four streams: !4!blacktopic.*
*/
val WHITELIST_MARKER = "*"
val BLACKLIST_MARKER = "!"
private val WHITELIST_PATTERN =
Pattern.compile("""\*(\p{Digit}+)\*(.*)""")
private val BLACKLIST_PATTERN =
Pattern.compile("""!(\p{Digit}+)!(.*)""")
def constructTopicCount(group: String,
consumerId: String,
zkClient: ZkClient) : TopicCount = {
val dirs = new ZKGroupDirs(group)
val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER)
val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER)
if (hasWhitelist || hasBlacklist)
info("Constructing topic count for %s from %s using %s as pattern."
.format(consumerId, topicCountString,
if (hasWhitelist) WHITELIST_PATTERN else BLACKLIST_PATTERN))
if (hasWhitelist || hasBlacklist) {
val matcher = if (hasWhitelist)
WHITELIST_PATTERN.matcher(topicCountString)
else
BLACKLIST_PATTERN.matcher(topicCountString)
require(matcher.matches())
val numStreams = matcher.group(1).toInt
val regex = matcher.group(2)
val filter = if (hasWhitelist)
new Whitelist(regex)
else
new Blacklist(regex)
new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
}
else {
var topMap : Map[String,Int] = null
try {
SyncJSON.parseFull(topicCountString) match {
case Some(m) => topMap = m.asInstanceOf[Map[String,Int]]
case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString)
}
}
catch {
case e =>
error("error parsing consumer json string " + topicCountString, e)
throw e
}
new StaticTopicCount(consumerId, topMap)
}
}
def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
new StaticTopicCount(consumerIdString, topicCount)
def constructTopicCount(consumerIdString: String,
filter: TopicFilter,
numStreams: Int,
zkClient: ZkClient) =
new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
}
private[kafka] class StaticTopicCount(val consumerIdString: String,
val topicCountMap: Map[String, Int])
extends TopicCount {
def getConsumerThreadIdsPerTopic =
makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
override def equals(obj: Any): Boolean = {
obj match {
case null => false
case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
case _ => false
}
}
/**
* return json of
* { "topic1" : 4,
* "topic2" : 4
* }
*/
def dbString = {
val builder = new StringBuilder
builder.append("{ ")
var i = 0
for ( (topic, nConsumers) <- topicCountMap) {
if (i > 0)
builder.append(",")
builder.append("\"" + topic + "\": " + nConsumers)
i += 1
}
builder.append(" }")
builder.toString()
}
}
private[kafka] class WildcardTopicCount(zkClient: ZkClient,
consumerIdString: String,
topicFilter: TopicFilter,
numStreams: Int) extends TopicCount {
def getConsumerThreadIdsPerTopic = {
val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(
zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
makeConsumerThreadIdsPerTopic(consumerIdString,
Map(wildcardTopics.map((_, numStreams)): _*))
}
def dbString = {
val marker = topicFilter match {
case wl: Whitelist => TopicCount.WHITELIST_MARKER
case bl: Blacklist => TopicCount.BLACKLIST_MARKER
}
"%s%d%s%s".format(marker, numStreams, marker, topicFilter.regex)
}
}