blob: 8ff4bd5a5f6ea1a51df926c31155251bcc109238 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util.Random
import java.util.Properties
import kafka.api.{TopicMetadata, PartitionMetadata}
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.utils.{Logging, ZkUtils, Json}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection._
import mutable.ListBuffer
import scala.collection.mutable
import kafka.common._
import scala.Predef._
import collection.Map
import scala.Some
import collection.Set
object AdminUtils extends Logging {
val rand = new Random
val TopicConfigChangeZnodePrefix = "config_change_"
/**
* There are 2 goals of replica assignment:
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
*
* To achieve this goal, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
*
* Here is an example of assigning
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
* p4 p0 p1 p2 p3 (2nd replica)
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*/
def assignReplicasToBrokers(brokerList: Seq[Int],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1)
: Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (replicationFactor > brokerList.size)
throw new AdminOperationException("replication factor: " + replicationFactor +
" larger than available brokers: " + brokerList.size)
val ret = new mutable.HashMap[Int, List[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
ret.put(currentPartitionId, replicaList.reverse)
currentPartitionId = currentPartitionId + 1
}
ret.toMap
}
def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
val existingReplicaList = existingPartitionsReplicaList.head._2
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")
// create the new partition replication list
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
if (unmatchedRepFactorList.size != 0)
throw new AdminOperationException("The replication factor in manual replication assignment " +
" is not equal to the existing replication factor for the topic " + existingReplicaList.size)
info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
var partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
var partitionId = startPartitionId
partitionList = partitionList.takeRight(partitionList.size - partitionId)
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
if (brokerList.size <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
if (!brokerList.toSet.subsetOf(availableBrokerList))
throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret.put(partitionId, brokerList.toList)
if (ret(partitionId).size != ret(startPartitionId).size)
throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
partitionId = partitionId + 1
}
ret.toMap
}
def deleteTopic(zkClient: ZkClient, topic: String) {
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
}
def topicExists(zkClient: ZkClient, topic: String): Boolean =
zkClient.exists(ZkUtils.getTopicPath(topic))
def createTopic(zkClient: ZkClient,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties) {
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
}
def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
// validate arguments
Topic.validate(topic)
LogConfig.validate(config)
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
val topicPath = ZkUtils.getTopicPath(topic)
if(!update && zkClient.exists(topicPath))
throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
// write out the config if there is any, this isn't transactional with the partition assignments
writeTopicConfig(zkClient, topic, config)
// create the partition assignment
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
}
private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
try {
val zkPath = ZkUtils.getTopicPath(topic)
val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2)))
if (!update) {
info("Topic creation " + jsonPartitionData.toString)
ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData)
} else {
info("Topic update " + jsonPartitionData.toString)
ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData)
}
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
} catch {
case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
case e2: Throwable => throw new AdminOperationException(e2.toString)
}
}
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
* @param topic: The topic for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) {
if(!topicExists(zkClient, topic))
throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
// remove the topic overrides
LogConfig.validate(configs)
// write the new config--may not exist if there were previously no overrides
writeTopicConfig(zkClient, topic, configs)
// create the change notification
zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic))
}
/**
* Write out the topic config to zk, if there is any
*/
private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) {
val configMap: mutable.Map[String, String] = {
import JavaConversions._
config
}
val map = Map("version" -> 1, "config" -> configMap)
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map))
}
/**
* Read the topic config (if any) from zk
*/
def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = {
val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true)
val props = new Properties()
if(str != null) {
Json.parseFull(str) match {
case None => // there are no config overrides
case Some(map: Map[String, _]) =>
require(map("version") == 1)
map.get("config") match {
case Some(config: Map[String, String]) =>
for((k,v) <- config)
props.setProperty(k, v)
case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
}
case o => throw new IllegalArgumentException("Unexpected value in config: " + str)
}
}
props
}
def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] =
ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap
def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
}
private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
val partitionMetadata = sortedPartitions.map { partitionMap =>
val partition = partitionMap._1
val replicas = partitionMap._2
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
var leaderInfo: Option[Broker] = None
var replicaInfo: Seq[Broker] = Nil
var isrInfo: Seq[Broker] = Nil
try {
leaderInfo = leader match {
case Some(l) =>
try {
Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
} catch {
case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
}
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
try {
replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
} catch {
case e: Throwable => throw new ReplicaNotAvailableException(e)
}
if(replicaInfo.size < replicas.size)
throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
if(isrInfo.size < inSyncReplicas.size)
throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
case e: Throwable =>
debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
}
new TopicMetadata(topic, partitionMetadata)
} else {
// topic doesn't exist, send appropriate error code
new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
}
}
private def getBrokerInfoFromCache(zkClient: ZkClient,
cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
brokerIds: Seq[Int]): Seq[Broker] = {
var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
val brokerMetadata = brokerIds.map { id =>
val optionalBrokerInfo = cachedBrokerInfo.get(id)
optionalBrokerInfo match {
case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
case None => // fetch it from zookeeper
ZkUtils.getBrokerInfo(zkClient, id) match {
case Some(brokerInfo) =>
cachedBrokerInfo += (id -> brokerInfo)
Some(brokerInfo)
case None =>
failedBrokerIds += id
None
}
}
}
brokerMetadata.filter(_.isDefined).map(_.get)
}
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
}