blob: 24174bea95461549db42a3f2ffc73f47967189be [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.common._
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
import kafka.utils.ZkUtils._
import java.util.Random
import java.util.Properties
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException}
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.MetadataResponse
import scala.collection._
import JavaConverters._
import mutable.ListBuffer
import scala.collection.mutable
import collection.Map
import collection.Set
import org.I0Itec.zkclient.exception.ZkNodeExistsException
object AdminUtils extends Logging {
val rand = new Random
val AdminClientId = "__admin_client"
val EntityConfigChangeZnodePrefix = "config_change_"
/**
* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
*
* To achieve this goal for replica assignment without considering racks, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
*
* Here is an example of assigning
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
* p4 p0 p1 p2 p3 (2nd replica)
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*
* To create rack aware assignment, this API will first create a rack alternated broker list. For example,
* from this brokerID -> rack mapping:
*
* 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
*
* The rack alternated list will be:
*
* 0, 3, 1, 5, 4, 2
*
* Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment
* will be:
*
* 0 -> 0,3,1
* 1 -> 3,1,5
* 2 -> 1,5,4
* 3 -> 5,4,2
* 4 -> 4,2,0
* 5 -> 2,0,3
*
* Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start
* shifting the followers. This is to ensure we will not always get the same set of sequences.
* In this case, if there is another partition to assign (partition #6), the assignment will be:
*
* 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
*
* The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated
* broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have
* any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on
* the broker list.
*
* As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that
* each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect
* situation where the number of replicas is the same as the number of racks and each rack has the same number of
* brokers, it guarantees that the replica distribution is even across brokers and racks.
*
* @return a Map from partition id to replica ids
* @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
* assign each replica to a unique rack.
*
*/
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (replicationFactor > brokerMetadatas.size)
throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
if (brokerMetadatas.forall(_.rack.isEmpty))
assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
startPartitionId)
else {
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId)
}
}
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
private def assignReplicasToBrokersRackAware(nPartitions: Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
id -> rack
}.toMap
val numRacks = brokerRackMap.values.toSet.size
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
val numBrokers = arrangedBrokerList.size
val ret = mutable.Map[Int, Seq[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
val leader = arrangedBrokerList(firstReplicaIndex)
val replicaBuffer = mutable.ArrayBuffer(leader)
val racksWithReplicas = mutable.Set(brokerRackMap(leader))
val brokersWithReplicas = mutable.Set(leader)
var k = 0
for (_ <- 0 until replicationFactor - 1) {
var done = false
while (!done) {
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
val rack = brokerRackMap(broker)
// Skip this broker if
// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
}
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
/**
* Given broker and rack information, returns a list of brokers alternated by the rack. Assume
* this is the rack and its brokers:
*
* rack1: 0, 1, 2
* rack2: 3, 4, 5
* rack3: 6, 7, 8
*
* This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
*
* This is essential to make sure that the assignReplicasToBrokers API can use such list and
* assign replicas to brokers in a simple round-robin fashion, while ensuring an even
* distribution of leader and replica counts on each broker and that replicas are
* distributed to all racks.
*/
private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
(rack, brokers.toIterator)
}
val racks = brokersIteratorByRack.keys.toArray.sorted
val result = new mutable.ArrayBuffer[Int]
var rackIndex = 0
while (result.size < brokerRackMap.size) {
val rackIterator = brokersIteratorByRack(racks(rackIndex))
if (rackIterator.hasNext)
result += rackIterator.next()
rackIndex = (rackIndex + 1) % racks.length
}
result
}
private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): Map[String, Seq[Int]] = {
brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
.groupBy { case (rack, _) => rack }
.map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
}
/**
* Add partitions to existing topic with optional replica assignment
*
* @param zkUtils Zookeeper utilities
* @param topic Topic for adding partitions to
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
* @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
*/
def addPartitions(zkUtils: ZkUtils,
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
checkBrokerAvailable: Boolean = true,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
case None => throw new AdminOperationException("the topic does not have partition with id 0, it should never happen")
case Some(headPartitionReplica) => headPartitionReplica._2
}
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")
// create the new partition replication list
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val newPartitionReplicaList =
if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head))
AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size,
startIndex, existingPartitionsReplicaList.size)
}
else
getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet,
existingPartitionsReplicaList.size, checkBrokerAvailable)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
if (unmatchedRepFactorList.size != 0)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)
info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
var partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
partitionList = partitionList.takeRight(partitionList.size - partitionId)
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret.put(partitionId, brokerList.toList)
if (ret(partitionId).size != ret(startPartitionId).size)
throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
partitionId = partitionId + 1
}
ret.toMap
}
def deleteTopic(zkUtils: ZkUtils, topic: String) {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
} catch {
case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
"topic %s is already marked for deletion".format(topic))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
zkUtils.getConsumersInGroup(group).nonEmpty
}
/**
* Delete the whole directory of the given consumer group if the group is inactive.
*
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @return whether or not we deleted the consumer group information
*/
def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupDirs(group)
zkUtils.deletePathRecursive(dir.consumerGroupDir)
true
}
else false
}
/**
* Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
* If the consumer group consumes no other topics, delete the whole consumer group directory.
*
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic
*/
def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics == Seq(topic)) {
deleteConsumerGroupInZK(zkUtils, group)
}
else if (!isConsumerGroupActive(zkUtils, group)) {
val dir = new ZKGroupTopicDirs(group, topic)
zkUtils.deletePathRecursive(dir.consumerOwnerDir)
zkUtils.deletePathRecursive(dir.consumerOffsetDir)
true
}
else false
}
/**
* Delete every inactive consumer group's information about the given topic in Zookeeper.
*
* @param zkUtils Zookeeper utilities
* @param topic Topic of the consumer group information we wish to delete
*/
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
}
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.zkClient.exists(getTopicPath(topic))
def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
val allBrokers = zkUtils.getAllBrokersInCluster()
val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
val brokersWithRack = brokers.filter(_.rack.nonEmpty)
if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
" to make replica assignment without rack information.")
}
val brokerMetadatas = rackAwareMode match {
case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
brokers.map(broker => BrokerMetadata(broker.id, None))
case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
}
brokerMetadatas.sortBy(_.id)
}
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
// validate arguments
Topic.validate(topic)
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
val topicPath = getTopicPath(topic)
if (!update) {
if (zkUtils.zkClient.exists(topicPath))
throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
else if (Topic.hasCollisionChars(topic)) {
val allTopics = zkUtils.getAllTopics()
val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t))
if (collidingTopics.nonEmpty) {
throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", ")))
}
}
}
partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
// Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
LogConfig.validate(config)
writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)
}
// create the partition assignment
writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = getTopicPath(topic)
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
if (!update) {
info("Topic creation " + jsonPartitionData.toString)
zkUtils.createPersistentPath(zkPath, jsonPartitionData)
} else {
info("Topic update " + jsonPartitionData.toString)
zkUtils.updatePersistentPath(zkPath, jsonPartitionData)
}
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
*
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param clientId: The clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) {
changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs)
}
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
*
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param topic: The topic for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
if(!topicExists(zkUtils, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
// remove the topic overrides
LogConfig.validate(configs)
changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
}
private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties) {
// write the new config--may not exist if there were previously no overrides
writeEntityConfig(zkUtils, entityType, entityName, configs)
// create the change notification
val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix
val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
zkUtils.zkClient.createPersistentSequential(seqNode, content)
}
def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = {
Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName)
}
/**
* Write out the topic config to zk, if there is any
*/
private def writeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, config: Properties) {
val configMap: mutable.Map[String, String] = {
import JavaConversions._
config
}
val map = Map("version" -> 1, "config" -> configMap)
zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map))
}
/**
* Read the entity (topic or client) config (if any) from zk
*/
def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): Properties = {
val str: String = zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true)
val props = new Properties()
if(str != null) {
Json.parseFull(str) match {
case None => // there are no config overrides
case Some(mapAnon: Map[_, _]) =>
val map = mapAnon collect { case (k: String, v: Any) => k -> v }
require(map("version") == 1)
map.get("config") match {
case Some(config: Map[_, _]) =>
for(configTup <- config)
configTup match {
case (k: String, v: String) =>
props.setProperty(k, v)
case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str)
}
case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)"
.format(str, entityType, entity))
}
}
props
}
def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata =
fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[MetadataResponse.TopicMetadata] = {
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo))
}
private def fetchTopicMetadataFromZk(topic: String,
zkUtils: ZkUtils,
cachedBrokerInfo: mutable.HashMap[Int, Broker],
protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): MetadataResponse.TopicMetadata = {
if(zkUtils.pathExists(getTopicPath(topic))) {
val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
val partitionMetadata = sortedPartitions.map { partitionMap =>
val partition = partitionMap._1
val replicas = partitionMap._2
val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partition)
val leader = zkUtils.getLeaderForPartition(topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
var leaderInfo: Node = Node.noNode()
var replicaInfo: Seq[Node] = Nil
var isrInfo: Seq[Node] = Nil
try {
leaderInfo = leader match {
case Some(l) =>
try {
getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, List(l)).head.getNode(protocol)
} catch {
case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
}
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
try {
replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, replicas).map(_.getNode(protocol))
isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, inSyncReplicas).map(_.getNode(protocol))
} catch {
case e: Throwable => throw new ReplicaNotAvailableException(e)
}
if(replicaInfo.size < replicas.size)
throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
if(isrInfo.size < inSyncReplicas.size)
throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
new MetadataResponse.PartitionMetadata(Errors.NONE, partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
} catch {
case e: Throwable =>
debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new MetadataResponse.PartitionMetadata(Errors.forException(e), partition, leaderInfo, replicaInfo.asJava, isrInfo.asJava)
}
}
new MetadataResponse.TopicMetadata(Errors.NONE, topic, partitionMetadata.toList.asJava)
} else {
// topic doesn't exist, send appropriate error code
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, java.util.Collections.emptyList())
}
}
private def getBrokerInfoFromCache(zkUtils: ZkUtils,
cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
brokerIds: Seq[Int]): Seq[Broker] = {
var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
val brokerMetadata = brokerIds.map { id =>
val optionalBrokerInfo = cachedBrokerInfo.get(id)
optionalBrokerInfo match {
case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
case None => // fetch it from zookeeper
zkUtils.getBrokerInfo(id) match {
case Some(brokerInfo) =>
cachedBrokerInfo += (id -> brokerInfo)
Some(brokerInfo)
case None =>
failedBrokerIds += id
None
}
}
}
brokerMetadata.filter(_.isDefined).map(_.get)
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
}