package kafka.admin
import joptsimple._
import java.util.Properties
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import scala.collection._
import scala.collection.JavaConversions._
import kafka.common.Topic
import kafka.cluster.Broker
object TopicCommand {
def main(args: Array[String]): Unit = {
val opts = new TopicCommandOptions(args)
// should have exactly one action
val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
if(actions != 1) {
System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt)
val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer)
createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
alterTopic(zkClient, opts)
else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
else if(opts.options.has(opts.describeOpt))
describeTopic(zkClient, opts)
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topics = opts.options.valuesOf(opts.topicOpt)
val configs = parseTopicConfigs(opts)
for (topic <- topics) {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
println("Created topic \"%s\".".format(topic))
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
val topic = opts.options.valueOf(opts.topicOpt)
if(opts.options.has(opts.configOpt)) {
val configs = parseTopicConfigs(opts)
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
if(opts.options.has(opts.partitionsOpt)) {
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
println("adding partitions succeeded!")
Utils.croak("Changing the replication factor is not supported.")
def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
for(topic <- opts.options.valuesOf(opts.topicOpt)) {
AdminUtils.deleteTopic(zkClient, topic)
println("Topic \"%s\" deleted.".format(topic))
def listTopics(zkClient: ZkClient) {
for(topic <- ZkUtils.getAllTopics(zkClient).sorted)
def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted
if (topics.size <= 0)
topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted
val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false
val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false
val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(
for (topic <- topics) {
ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
if (!reportUnavailablePartitions && !reportUnderReplicatedPartitions) {
val config = AdminUtils.fetchTopicConfig(zkClient, topic)
println("\tconfigs: " + => kv._1 + " = " + kv._2).mkString(", "))
println("\tpartitions: " + sortedPartitions.size)
for ((partitionId, assignedReplicas) <- sortedPartitions) {
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionId)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionId)
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
(reportUnavailablePartitions && (!leader.isDefined || !liveBrokers.contains(leader.get)))) {
print("\t\ttopic: " + topic)
print("\tpartition: " + partitionId)
print("\tleader: " + (if(leader.isDefined) leader.get else "none"))
print("\treplicas: " + assignedReplicas.mkString(","))
println("\tisr: " + inSyncReplicas.mkString(","))
case None =>
println("topic " + topic + " doesn't exist!")
def formatBroker(broker: Broker) = + " (" + + ":" + broker.port + ")"
def parseTopicConfigs(opts: TopicCommandOptions): Properties = {
val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*"))
require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".")
val props = new Properties
configs.foreach(pair => props.setProperty(pair(0), pair(1)))
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
class TopicCommandOptions(args: Array[String]) {
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
val listOpt = parser.accepts("list", "List all available topics.")
val createOpt = parser.accepts("create", "Create a new topic.")
val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
val deleteOpt = parser.accepts("delete", "Delete the topic.")
val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.")
val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.")
val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.")
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
.describedAs("# of partitions")
val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created.")
.describedAs("replication factor")
val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
"if set when describing topics, only show under replicated partitions")
val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
"if set when describing topics, only show partitions whose leader is not available")
val options = parser.parse(args : _*)