* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.utils
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.serialize.ZkSerializer
import kafka.cluster.{Broker, Cluster}
import scala.collection._
import java.util.Properties
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import kafka.consumer.TopicCount
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
* make sure a persistent path exists in ZK. Create the path if not exist.
def makeSurePersistentPathExists(client: ZkClient, path: String) {
if (!client.exists(path))
client.createPersistent(path, true) // won't throw NoNodeException or NodeExistsException
* create the parent path
private def createParentPath(client: ZkClient, path: String): Unit = {
val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0)
client.createPersistent(parentDir, true)
* Create an ephemeral node with the given path and data. Create parents if necessary.
private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.createEphemeral(path, data)
catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createEphemeral(path, data)
* Create an ephemeral node with the given path and data.
* Throw NodeExistException if node already exists.
def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
try {
createEphemeralPath(client, path, data)
catch {
case e: ZkNodeExistsException => {
// this can happen when there is connection loss; make sure the data is what we intend to write
var storedData: String = null
try {
storedData = readData(client, path)
catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
case e2 => throw e2
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
throw e
else {
// otherwise, the creation succeeded, return normally
info(path + " exists with value " + data + " during connection loss; this is ok")
case e2 => throw e2
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.writeData(path, data)
catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
client.createPersistent(path, data)
catch {
case e: ZkNodeExistsException => client.writeData(path, data)
case e2 => throw e2
case e2 => throw e2
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.writeData(path, data)
catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createEphemeral(path, data)
case e2 => throw e2
def deletePath(client: ZkClient, path: String) {
try {
catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
case e2 => throw e2
def deletePathRecursive(client: ZkClient, path: String) {
try {
catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
case e2 => throw e2
def readData(client: ZkClient, path: String): String = {
def readDataMaybeNull(client: ZkClient, path: String): String = {
client.readData(path, true)
def getChildren(client: ZkClient, path: String): Seq[String] = {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
var ret: java.util.List[String] = null
try {
ret = client.getChildren(path)
catch {
case e: ZkNoNodeException =>
return Nil
case e2 => throw e2
return ret
* Check if the given path exists
def pathExists(client: ZkClient, path: String): Boolean = {
def getLastPart(path : String) : String = path.substring(path.lastIndexOf('/') + 1)
def getCluster(zkClient: ZkClient) : Cluster = {
val cluster = new Cluster
val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
for (node <- nodes) {
val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)
cluster.add(Broker.createBroker(node.toInt, brokerZKString))
def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, List[String]] = {
val ret = new mutable.HashMap[String, List[String]]()
for (topic <- topics) {
var partList: List[String] = Nil
val brokers = getChildrenParentMayNotExist(zkClient, BrokerTopicsPath + "/" + topic)
for (broker <- brokers) {
val nParts = readData(zkClient, BrokerTopicsPath + "/" + topic + "/" + broker).toInt
for (part <- 0 until nParts)
partList ::= broker + "-" + part
partList = partList.sortWith((s,t) => s < t)
ret += (topic -> partList)
def setupPartition(zkClient : ZkClient, brokerId: Int, host: String, port: Int, topic: String, nParts: Int) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
val broker = new Broker(brokerId, brokerId.toString, host, port)
createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
createEphemeralPathExpectConflict(zkClient, brokerPartTopicPath, nParts.toString)
def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
def getConsumersInGroup(zkClient: ZkClient, group: String): Seq[String] = {
val dirs = new ZKGroupDirs(group)
getChildren(zkClient, dirs.consumerRegistryDir)
def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
val dirs = new ZKGroupDirs(group)
val consumersInGroup = getConsumersInGroup(zkClient, group)
val topicCountMaps = => TopicCount.constructTopicCount(consumerId,
ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
def getConsumersPerTopic(zkClient: ZkClient, group: String) : mutable.Map[String, List[String]] = {
val dirs = new ZKGroupDirs(group)
val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
for (consumer <- consumers) {
val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient)
for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
for (consumerThreadId <- consumerThreadIdSet)
consumersPerTopicMap.get(topic) match {
case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
for ( (topic, consumerList) <- consumersPerTopicMap )
consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
object ZKStringSerializer extends ZkSerializer {
def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
def deserialize(bytes : Array[Byte]) : Object = {
if (bytes == null)
new String(bytes, "UTF-8")
class ZKGroupDirs(val group: String) {
def consumerDir = ZkUtils.ConsumersPath
def consumerGroupDir = consumerDir + "/" + group
def consumerRegistryDir = consumerGroupDir + "/ids"
class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
def consumerOwnerDir = consumerGroupDir + "/owners/" + topic
class ZKConfig(props: Properties) {
/** ZK host string */
val zkConnect = Utils.getString(props, "zk.connect", null)
/** zookeeper session timeout */
val zkSessionTimeoutMs = Utils.getInt(props, "", 6000)
/** the max time that the client waits to establish a connection to zookeeper */
val zkConnectionTimeoutMs = Utils.getInt(props, "",zkSessionTimeoutMs)
/** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = Utils.getInt(props, "", 2000)