blob: bd8ec7ec8776501b467214ecfe69ab87e68eb63a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.concurrent.CountDownLatch
import kafka.admin._
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr}
import kafka.cluster._
import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
import kafka.consumer.{ConsumerThreadId, TopicCount}
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
import kafka.server.ConfigType
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
import scala.collection._
object ZkUtils {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
val IsrChangeNotificationPath = "/isr_change_notification"
val EntityConfigPath = "/config"
val EntityConfigChangesPath = "/config/changes"
def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = {
val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
new ZkUtils(zkClient, zkConnection, isZkSecurityEnabled)
}
/*
* Used in tests
*/
def apply(zkClient: ZkClient, isZkSecurityEnabled: Boolean): ZkUtils = {
new ZkUtils(zkClient, null, isZkSecurityEnabled)
}
def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = {
val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer)
zkClient
}
def createZkClientAndConnection(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): (ZkClient, ZkConnection) = {
val zkConnection = new ZkConnection(zkUrl, sessionTimeout)
val zkClient = new ZkClient(zkConnection, connectionTimeout, ZKStringSerializer)
(zkClient, zkConnection)
}
def DefaultAcls(isSecure: Boolean): java.util.List[ACL] = if (isSecure) {
val list = new java.util.ArrayList[ACL]
list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
list
} else {
ZooDefs.Ids.OPEN_ACL_UNSAFE
}
def maybeDeletePath(zkUrl: String, dir: String) {
try {
val zk = createZkClient(zkUrl, 30*1000, 30*1000)
zk.deleteRecursive(dir)
zk.close()
} catch {
case _: Throwable => // swallow
}
}
/*
* Get calls that only depend on static paths
*/
def getTopicPath(topic: String): String = {
ZkUtils.BrokerTopicsPath + "/" + topic
}
def getTopicPartitionsPath(topic: String): String = {
getTopicPath(topic) + "/partitions"
}
def getTopicPartitionPath(topic: String, partitionId: Int): String =
getTopicPartitionsPath(topic) + "/" + partitionId
def getTopicPartitionLeaderAndIsrPath(topic: String, partitionId: Int): String =
getTopicPartitionPath(topic, partitionId) + "/" + "state"
def getEntityConfigRootPath(entityType: String): String =
ZkUtils.EntityConfigPath + "/" + entityType
def getEntityConfigPath(entityType: String, entity: String): String =
getEntityConfigRootPath(entityType) + "/" + entity
def getDeleteTopicPath(topic: String): String =
DeleteTopicsPath + "/" + topic
}
class ZkUtils(val zkClient: ZkClient,
val zkConnection: ZkConnection,
val isSecure: Boolean) extends Logging {
// These are persistent ZK paths that should exist on kafka broker startup.
val persistentZkPaths = Seq(ConsumersPath,
BrokerIdsPath,
BrokerTopicsPath,
EntityConfigChangesPath,
getEntityConfigRootPath(ConfigType.Topic),
getEntityConfigRootPath(ConfigType.Client),
DeleteTopicsPath,
BrokerSequenceIdPath,
IsrChangeNotificationPath)
val securePersistentZkPaths = Seq(BrokerIdsPath,
BrokerTopicsPath,
EntityConfigChangesPath,
getEntityConfigRootPath(ConfigType.Topic),
getEntityConfigRootPath(ConfigType.Client),
DeleteTopicsPath,
BrokerSequenceIdPath,
IsrChangeNotificationPath)
val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)
def getController(): Int = {
readDataMaybeNull(ControllerPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
case None => throw new KafkaException("Controller doesn't exist")
}
}
def getSortedBrokerList(): Seq[Int] =
getChildren(BrokerIdsPath).map(_.toInt).sorted
def getAllBrokersInCluster(): Seq[Broker] = {
val brokerIds = getChildrenParentMayNotExist(BrokerIdsPath).sorted
brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get)
}
def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = {
getAllBrokersInCluster().map(_.getBrokerEndPoint(protocolType))
}
def getLeaderAndIsrForPartition(topic: String, partition: Int):Option[LeaderAndIsr] = {
ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topic, partition).map(_.leaderAndIsr)
}
def setupCommonPaths() {
for(path <- persistentZkPaths)
makeSurePersistentPathExists(path)
}
def getLeaderForPartition(topic: String, partition: Int): Option[Int] = {
val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
leaderAndIsrOpt match {
case Some(leaderAndIsr) =>
Json.parseFull(leaderAndIsr) match {
case Some(m) =>
Some(m.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int])
case None => None
}
case None => None
}
}
/**
* This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the
* leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
* other broker will retry becoming leader with the same new epoch value.
*/
def getEpochForPartition(topic: String, partition: Int): Int = {
val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
leaderAndIsrOpt match {
case Some(leaderAndIsr) =>
Json.parseFull(leaderAndIsr) match {
case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for partition [%s,%d] is invalid".format(topic, partition))
case Some(m) => m.asInstanceOf[Map[String, Any]].get("leader_epoch").get.asInstanceOf[Int]
}
case None => throw new NoEpochForPartitionException("No epoch, ISR path for partition [%s,%d] is empty"
.format(topic, partition))
}
}
/** returns a sequence id generated by updating BrokerSequenceIdPath in Zk.
* users can provide brokerId in the config , inorder to avoid conflicts between zk generated
* seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId.
*/
def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = {
getSequenceId(BrokerSequenceIdPath) + MaxReservedBrokerId
}
/**
* Gets the in-sync replicas (ISR) for a specific topic and partition
*/
def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
leaderAndIsrOpt match {
case Some(leaderAndIsr) =>
Json.parseFull(leaderAndIsr) match {
case Some(m) => m.asInstanceOf[Map[String, Any]].get("isr").get.asInstanceOf[Seq[Int]]
case None => Seq.empty[Int]
}
case None => Seq.empty[Int]
}
}
/**
* Gets the assigned replicas (AR) for a specific topic and partition
*/
def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = {
val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
Json.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(replicaMap) => replicaMap.asInstanceOf[Map[String, Seq[Int]]].get(partition.toString) match {
case Some(seq) => seq
case None => Seq.empty[Int]
}
case None => Seq.empty[Int]
}
case None => Seq.empty[Int]
}
case None => Seq.empty[Int]
}
}
/**
* Register brokers with v3 json format (which includes multiple endpoints and rack) if
* the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise.
* Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2.
* We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X without having to upgrade
* to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
*
* This format also includes default endpoints for compatibility with older clients.
*
* @param id broker ID
* @param host broker host name
* @param port broker port
* @param advertisedEndpoints broker end points
* @param jmxPort jmx port
* @param rack broker rack
* @param apiVersion Kafka version the broker is running as
*/
def registerBrokerInZk(id: Int,
host: String,
port: Int,
advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint],
jmxPort: Int,
rack: Option[String],
apiVersion: ApiVersion) {
val brokerIdPath = BrokerIdsPath + "/" + id
val timestamp = SystemTime.milliseconds.toString
val version = if (apiVersion >= KAFKA_0_10_0_IV0) 3 else 2
var jsonMap = Map("version" -> version,
"host" -> host,
"port" -> port,
"endpoints" -> advertisedEndpoints.values.map(_.connectionString).toArray,
"jmx_port" -> jmxPort,
"timestamp" -> timestamp
)
rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack))
val brokerInfo = Json.encode(jsonMap)
registerBrokerInZk(brokerIdPath, brokerInfo)
info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
}
private def registerBrokerInZk(brokerIdPath: String, brokerInfo: String) {
try {
val zkCheckedEphemeral = new ZKCheckedEphemeral(brokerIdPath,
brokerInfo,
zkConnection.getZookeeper,
isSecure)
zkCheckedEphemeral.create()
} catch {
case e: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+ ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
+ "else you have shutdown this broker and restarted it faster than the zookeeper "
+ "timeout so it appears to be re-registering.")
}
}
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition
}
def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
}
/**
* Get JSON partition to replica map from zookeeper.
*/
def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
Json.encode(Map("version" -> 1, "partitions" -> map))
}
/**
* make sure a persistent path exists in ZK. Create the path if not exist.
*/
def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = DefaultAcls) {
//Consumer path is kept open as different consumers will write under this node.
val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) {
ZooDefs.Ids.OPEN_ACL_UNSAFE
} else acls
if (!zkClient.exists(path))
ZkPath.createPersistent(zkClient, path, true, acl) //won't throw NoNodeException or NodeExistsException
}
/**
* create the parent path
*/
private def createParentPath(path: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
val parentDir = path.substring(0, path.lastIndexOf('/'))
if (parentDir.length != 0) {
ZkPath.createPersistent(zkClient, parentDir, true, acls)
}
}
/**
* Create an ephemeral node with the given path and data. Create parents if necessary.
*/
private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
try {
ZkPath.createEphemeral(zkClient, path, data, acls)
} catch {
case e: ZkNoNodeException => {
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
}
}
}
/**
* Create an ephemeral node with the given path and data.
* Throw NodeExistException if node already exists.
*/
def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
try {
createEphemeralPath(path, data, acls)
} catch {
case e: ZkNodeExistsException => {
// this can happen when there is connection loss; make sure the data is what we intend to write
var storedData: String = null
try {
storedData = readData(path)._1
} catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
case e2: Throwable => throw e2
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
throw e
} else {
// otherwise, the creation succeeded, return normally
info(path + " exists with value " + data + " during connection loss; this is ok")
}
}
case e2: Throwable => throw e2
}
}
/**
* Create an persistent node with the given path and data. Create parents if necessary.
*/
def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = DefaultAcls): Unit = {
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
case e: ZkNoNodeException => {
createParentPath(path)
ZkPath.createPersistent(zkClient, path, data, acls)
}
}
}
def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = DefaultAcls): String = {
ZkPath.createPersistentSequential(zkClient, path, data, acls)
}
/**
* Update the value of a persistent node with the given path and data.
* create parent directory if necessary. Never throw NodeExistException.
* Return the updated path zkVersion
*/
def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls) = {
try {
zkClient.writeData(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(path)
try {
ZkPath.createPersistent(zkClient, path, data, acls)
} catch {
case e: ZkNodeExistsException =>
zkClient.writeData(path, data)
case e2: Throwable => throw e2
}
}
case e2: Throwable => throw e2
}
}
/**
* Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
* exist, the current version is not the expected version, etc.) return (false, -1)
*
* When there is a ConnectionLossException during the conditional update, zkClient will retry the update and may fail
* since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one).
* In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded.
*/
def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int,
optionalChecker:Option[(ZkUtils, String, String) => (Boolean,Int)] = None): (Boolean, Int) = {
try {
val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
} catch {
case e1: ZkBadVersionException =>
optionalChecker match {
case Some(checker) => checker(this, path, data)
case _ =>
debug("Checker method is not passed skipping zkData match")
warn("Conditional update of path %s with data %s and expected version %d failed due to %s"
.format(path, data,expectVersion, e1.getMessage))
(false, -1)
}
case e2: Exception =>
warn("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
expectVersion, e2.getMessage))
(false, -1)
}
}
/**
* Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current
* version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException
*/
def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = {
try {
val stat = zkClient.writeDataReturnStat(path, data, expectVersion)
debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
} catch {
case nne: ZkNoNodeException => throw nne
case e: Exception =>
error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data,
expectVersion, e.getMessage))
(false, -1)
}
}
/**
* Update the value of a persistent node with the given path and data.
* create parent directory if necessary. Never throw NodeExistException.
*/
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
try {
zkClient.writeData(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(path)
ZkPath.createEphemeral(zkClient, path, data, acls)
}
case e2: Throwable => throw e2
}
}
def deletePath(path: String): Boolean = {
try {
zkClient.delete(path)
} catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
false
case e2: Throwable => throw e2
}
}
/**
* Conditional delete the persistent path data, return true if it succeeds,
* otherwise (the current version is not the expected version)
*/
def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = {
try {
zkClient.delete(path, expectedVersion)
true
} catch {
case e: KeeperException.BadVersionException => false
}
}
def deletePathRecursive(path: String) {
try {
zkClient.deleteRecursive(path)
} catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
case e2: Throwable => throw e2
}
}
def readData(path: String): (String, Stat) = {
val stat: Stat = new Stat()
val dataStr: String = zkClient.readData(path, stat)
(dataStr, stat)
}
def readDataMaybeNull(path: String): (Option[String], Stat) = {
val stat: Stat = new Stat()
val dataAndStat = try {
(Some(zkClient.readData(path, stat)), stat)
} catch {
case e: ZkNoNodeException =>
(None, stat)
case e2: Throwable => throw e2
}
dataAndStat
}
def getChildren(path: String): Seq[String] = {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
zkClient.getChildren(path)
}
def getChildrenParentMayNotExist(path: String): Seq[String] = {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
try {
zkClient.getChildren(path)
} catch {
case e: ZkNoNodeException => Nil
case e2: Throwable => throw e2
}
}
/**
* Check if the given path exists
*/
def pathExists(path: String): Boolean = {
zkClient.exists(path)
}
def getCluster() : Cluster = {
val cluster = new Cluster
val nodes = getChildrenParentMayNotExist(BrokerIdsPath)
for (node <- nodes) {
val brokerZKString = readData(BrokerIdsPath + "/" + node)._1
cluster.add(Broker.createBroker(node.toInt, brokerZKString))
}
cluster
}
def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition])
: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
for(topicAndPartition <- topicAndPartitions) {
ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match {
case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
case None =>
}
}
ret
}
def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
topics.foreach { topic =>
val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
Json.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(repl) =>
val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
for((partition, replicas) <- replicaMap){
ret.put(TopicAndPartition(topic, partition.toInt), replicas)
debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
}
case None =>
}
case None =>
}
case None =>
}
}
ret
}
def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
topics.foreach{ topic =>
val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
val partitionMap = jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
Json.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(replicaMap) =>
val m1 = replicaMap.asInstanceOf[Map[String, Seq[Int]]]
m1.map(p => (p._1.toInt, p._2))
case None => Map[Int, Seq[Int]]()
}
case None => Map[Int, Seq[Int]]()
}
case None => Map[Int, Seq[Int]]()
}
debug("Partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
ret += (topic -> partitionMap)
}
ret
}
def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
(topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
}
}
def getPartitionsBeingReassigned(): Map[TopicAndPartition, ReassignedPartitionsContext] = {
// read the partitions and their new replica list
val jsonPartitionMapOpt = readDataMaybeNull(ReassignPartitionsPath)._1
jsonPartitionMapOpt match {
case Some(jsonPartitionMap) =>
val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
reassignedPartitions.map(p => (p._1 -> new ReassignedPartitionsContext(p._2)))
case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
}
}
// Parses without deduplicating keys so the the data can be checked before allowing reassignment to proceed
def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = {
Json.parseFull(jsonData) match {
case Some(m) =>
m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(partitionsSeq) =>
partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
val topic = p.get("topic").get.asInstanceOf[String]
val partition = p.get("partition").get.asInstanceOf[Int]
val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
TopicAndPartition(topic, partition) -> newReplicas
})
case None =>
Seq.empty
}
case None =>
Seq.empty
}
}
def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = {
parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
}
def parseTopicsData(jsonData: String): Seq[String] = {
var topics = List.empty[String]
Json.parseFull(jsonData) match {
case Some(m) =>
m.asInstanceOf[Map[String, Any]].get("topics") match {
case Some(partitionsSeq) =>
val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
mapPartitionSeq.foreach(p => {
val topic = p.get("topic").get.asInstanceOf[String]
topics ++= List(topic)
})
case None =>
}
case None =>
}
topics
}
def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition,
"replicas" -> e._2))))
}
def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) {
val zkPath = ReassignPartitionsPath
partitionsToBeReassigned.size match {
case 0 => // need to delete the /admin/reassign_partitions path
deletePath(zkPath)
info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
case _ =>
val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned)
try {
updatePersistentPath(zkPath, jsonData)
debug("Updated partition reassignment path with %s".format(jsonData))
} catch {
case nne: ZkNoNodeException =>
createPersistentPath(zkPath, jsonData)
debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
}
def getPartitionsUndergoingPreferredReplicaElection(): Set[TopicAndPartition] = {
// read the partitions and their new replica list
val jsonPartitionListOpt = readDataMaybeNull(PreferredReplicaLeaderElectionPath)._1
jsonPartitionListOpt match {
case Some(jsonPartitionList) => PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(jsonPartitionList)
case None => Set.empty[TopicAndPartition]
}
}
def deletePartition(brokerId: Int, topic: String) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
zkClient.delete(brokerIdPath)
val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
zkClient.delete(brokerPartTopicPath)
}
def getConsumersInGroup(group: String): Seq[String] = {
val dirs = new ZKGroupDirs(group)
getChildren(dirs.consumerRegistryDir)
}
def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
val dirs = new ZKGroupDirs(group)
val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
for (consumer <- consumers) {
val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)
for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
for (consumerThreadId <- consumerThreadIdSet)
consumersPerTopicMap.get(topic) match {
case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
}
}
}
for ( (topic, consumerList) <- consumersPerTopicMap )
consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
consumersPerTopicMap
}
/**
* This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
* or throws an exception if the broker dies before the query to zookeeper finishes
*
* @param brokerId The broker id
* @return An optional Broker object encapsulating the broker metadata
*/
def getBrokerInfo(brokerId: Int): Option[Broker] = {
readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
case None => None
}
}
/**
* This API produces a sequence number by creating / updating given path in zookeeper
* It uses the stat returned by the zookeeper and return the version. Every time
* client updates the path stat.version gets incremented
*/
def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): Int = {
try {
val stat = zkClient.writeDataReturnStat(path, "", -1)
stat.getVersion
} catch {
case e: ZkNoNodeException => {
createParentPath(BrokerSequenceIdPath, acls)
try {
zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
0
} catch {
case e: ZkNodeExistsException =>
val stat = zkClient.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
stat.getVersion
}
}
}
}
def getAllTopics(): Seq[String] = {
val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
if(topics == null)
Seq.empty[String]
else
topics
}
/**
* Returns all the entities whose configs have been overridden.
*/
def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
val entities = getChildrenParentMayNotExist(getEntityConfigRootPath(entityType))
if(entities == null)
Seq.empty[String]
else
entities
}
def getAllPartitions(): Set[TopicAndPartition] = {
val topics = getChildrenParentMayNotExist(BrokerTopicsPath)
if(topics == null) Set.empty[TopicAndPartition]
else {
topics.map { topic =>
getChildren(getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic, _))
}.flatten.toSet
}
}
def getConsumerGroups() = {
getChildren(ConsumersPath)
}
def getTopicsByConsumerGroup(consumerGroup:String) = {
getChildrenParentMayNotExist(new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
}
def getAllConsumerGroupsForTopic(topic: String): Set[String] = {
val groups = getChildrenParentMayNotExist(ConsumersPath)
if (groups == null) Set.empty
else {
groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) =>
val topics = getChildren(new ZKGroupDirs(group).consumerGroupOffsetsDir)
if (topics.contains(topic)) consumerGroupsForTopic + group
else consumerGroupsForTopic
}
}
}
def close() {
if(zkClient != null) {
zkClient.close()
}
}
}
private object ZKStringSerializer extends ZkSerializer {
@throws(classOf[ZkMarshallingError])
def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8")
@throws(classOf[ZkMarshallingError])
def deserialize(bytes : Array[Byte]) : Object = {
if (bytes == null)
null
else
new String(bytes, "UTF-8")
}
}
class ZKGroupDirs(val group: String) {
def consumerDir = ConsumersPath
def consumerGroupDir = consumerDir + "/" + group
def consumerRegistryDir = consumerGroupDir + "/ids"
def consumerGroupOffsetsDir = consumerGroupDir + "/offsets"
def consumerGroupOwnersDir = consumerGroupDir + "/owners"
}
class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic
def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic
}
class ZKConfig(props: VerifiableProperties) {
/** ZK host string */
val zkConnect = props.getString("zookeeper.connect")
/** zookeeper session timeout */
val zkSessionTimeoutMs = props.getInt("zookeeper.session.timeout.ms", 6000)
/** the max time that the client waits to establish a connection to zookeeper */
val zkConnectionTimeoutMs = props.getInt("zookeeper.connection.timeout.ms",zkSessionTimeoutMs)
/** how far a ZK follower can be behind a ZK leader */
val zkSyncTimeMs = props.getInt("zookeeper.sync.time.ms", 2000)
}
object ZkPath {
@volatile private var isNamespacePresent: Boolean = false
def checkNamespace(client: ZkClient) {
if(isNamespacePresent)
return
if (!client.exists("/")) {
throw new ConfigException("Zookeeper namespace does not exist")
}
isNamespacePresent = true
}
def resetNamespaceCheckedState {
isNamespacePresent = false
}
def createPersistent(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
checkNamespace(client)
client.createPersistent(path, data, acls)
}
def createPersistent(client: ZkClient, path: String, createParents: Boolean, acls: java.util.List[ACL]) {
checkNamespace(client)
client.createPersistent(path, createParents, acls)
}
def createEphemeral(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]) {
checkNamespace(client)
client.createEphemeral(path, data, acls)
}
def createPersistentSequential(client: ZkClient, path: String, data: Object, acls: java.util.List[ACL]): String = {
checkNamespace(client)
client.createPersistentSequential(path, data, acls)
}
}
/**
* Creates an ephemeral znode checking the session owner
* in the case of conflict. In the regular case, the
* znode is created and the create call returns OK. If
* the call receives a node exists event, then it checks
* if the session matches. If it does, then it returns OK,
* and otherwise it fails the operation.
*/
class ZKCheckedEphemeral(path: String,
data: String,
zkHandle: ZooKeeper,
isSecure: Boolean) extends Logging {
private val createCallback = new CreateCallback
private val getDataCallback = new GetDataCallback
val latch: CountDownLatch = new CountDownLatch(1)
var result: Code = Code.OK
private class CreateCallback extends StringCallback {
def processResult(rc: Int,
path: String,
ctx: Object,
name: String) {
Code.get(rc) match {
case Code.OK =>
setResult(Code.OK)
case Code.CONNECTIONLOSS =>
// try again
createEphemeral
case Code.NONODE =>
error("No node for path %s (could be the parent missing)".format(path))
setResult(Code.NONODE)
case Code.NODEEXISTS =>
zkHandle.getData(path, false, getDataCallback, null)
case Code.SESSIONEXPIRED =>
error("Session has expired while creating %s".format(path))
setResult(Code.SESSIONEXPIRED)
case Code.INVALIDACL =>
error("Invalid ACL")
setResult(Code.INVALIDACL)
case _ =>
warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc)))
setResult(Code.get(rc))
}
}
}
private class GetDataCallback extends DataCallback {
def processResult(rc: Int,
path: String,
ctx: Object,
readData: Array[Byte],
stat: Stat) {
Code.get(rc) match {
case Code.OK =>
if (stat.getEphemeralOwner != zkHandle.getSessionId)
setResult(Code.NODEEXISTS)
else
setResult(Code.OK)
case Code.NONODE =>
info("The ephemeral node [%s] at %s has gone away while reading it, ".format(data, path))
createEphemeral
case Code.SESSIONEXPIRED =>
error("Session has expired while reading znode %s".format(path))
setResult(Code.SESSIONEXPIRED)
case Code.INVALIDACL =>
error("Invalid ACL")
setResult(Code.INVALIDACL)
case _ =>
warn("ZooKeeper event while getting znode data: %s %s".format(path, Code.get(rc)))
setResult(Code.get(rc))
}
}
}
private def createEphemeral() {
zkHandle.create(path,
ZKStringSerializer.serialize(data),
DefaultAcls(isSecure),
CreateMode.EPHEMERAL,
createCallback,
null)
}
private def createRecursive(prefix: String, suffix: String) {
debug("Path: %s, Prefix: %s, Suffix: %s".format(path, prefix, suffix))
if(suffix.isEmpty()) {
createEphemeral
} else {
zkHandle.create(prefix,
new Array[Byte](0),
DefaultAcls(isSecure),
CreateMode.PERSISTENT,
new StringCallback() {
def processResult(rc : Int,
path : String,
ctx : Object,
name : String) {
Code.get(rc) match {
case Code.OK | Code.NODEEXISTS =>
// Nothing to do
case Code.CONNECTIONLOSS =>
// try again
val suffix = ctx.asInstanceOf[String]
createRecursive(path, suffix)
case Code.NONODE =>
error("No node for path %s (could be the parent missing)".format(path))
setResult(Code.get(rc))
case Code.SESSIONEXPIRED =>
error("Session has expired while creating %s".format(path))
setResult(Code.get(rc))
case Code.INVALIDACL =>
error("Invalid ACL")
setResult(Code.INVALIDACL)
case _ =>
warn("ZooKeeper event while creating registration node: %s %s".format(path, Code.get(rc)))
setResult(Code.get(rc))
}
}
},
suffix)
// Update prefix and suffix
val index = suffix.indexOf('/', 1) match {
case -1 => suffix.length
case x : Int => x
}
// Get new prefix
val newPrefix = prefix + suffix.substring(0, index)
// Get new suffix
val newSuffix = suffix.substring(index, suffix.length)
createRecursive(newPrefix, newSuffix)
}
}
private def setResult(code: Code) {
result = code
latch.countDown()
}
private def waitUntilResolved(): Code = {
latch.await()
result
}
def create() {
val index = path.indexOf('/', 1) match {
case -1 => path.length
case x : Int => x
}
val prefix = path.substring(0, index)
val suffix = path.substring(index, path.length)
debug(s"Path: $path, Prefix: $prefix, Suffix: $suffix")
info(s"Creating $path (is it secure? $isSecure)")
createRecursive(prefix, suffix)
val result = waitUntilResolved()
info("Result of znode creation is: %s".format(result))
result match {
case Code.OK =>
// Nothing to do
case _ =>
throw ZkException.create(KeeperException.create(result))
}
}
}