blob: d4e5e24824829a15e1c1c7db62825c77b263c1d9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import joptsimple.OptionParser
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils}
object VerifyConsumerRebalance extends Logging {
def main(args: Array[String]) {
val parser = new OptionParser()
val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string.").
withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])
val groupOpt = parser.accepts("group", "Consumer group.").
withRequiredArg().ofType(classOf[String])
parser.accepts("help", "Print this message.")
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Validate that all partitions have a consumer for a given consumer group.")
val options = parser.parse(args : _*)
if (options.has("help")) {
parser.printHelpOn(System.out)
System.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val group = options.valueOf(groupOpt)
var zkClient: ZkClient = null
try {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
debug("zkConnect = %s; group = %s".format(zkConnect, group))
// check if the rebalancing operation succeeded.
try {
if(validateRebalancingOperation(zkClient, group))
println("Rebalance operation successful !")
else
println("Rebalance operation failed !")
} catch {
case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
}
}
finally {
if (zkClient != null)
zkClient.close()
}
}
private def validateRebalancingOperation(zkClient: ZkClient, group: String): Boolean = {
info("Verifying rebalancing operation for consumer group " + group)
var rebalanceSucceeded: Boolean = true
/**
* A successful rebalancing operation would select an owner for each available partition
* This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists
* under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
*/
val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false)
val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq)
partitionsPerTopicMap.foreach { partitionsForTopic =>
val topic = partitionsForTopic._1
val partitions = partitionsForTopic._2
val topicDirs = new ZKGroupTopicDirs(group, topic)
info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
val partitionsWithOwners = ZkUtils.getChildrenParentMayNotExist(zkClient, topicDirs.consumerOwnerDir)
if(partitionsWithOwners.size == 0) {
error("No owners for any partitions for topic " + topic)
rebalanceSucceeded = false
}
debug("Children of " + topicDirs.consumerOwnerDir + " = " + partitionsWithOwners.toString)
val consumerIdsForTopic = consumersPerTopicMap.get(topic)
// for each available partition for topic, check if an owner exists
partitions.foreach { partition =>
// check if there is a node for [partition]
if(!partitionsWithOwners.exists(p => p.equals(partition))) {
error("No owner for partition [%s,%d]".format(topic, partition))
rebalanceSucceeded = false
}
// try reading the partition owner path for see if a valid consumer id exists there
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1 match {
case Some(m) => m
case None => null
}
if(partitionOwner == null) {
error("No owner for partition [%s,%d]".format(topic, partition))
rebalanceSucceeded = false
}
else {
// check if the owner is a valid consumer id
consumerIdsForTopic match {
case Some(consumerIds) =>
if(!consumerIds.map(c => c.toString).contains(partitionOwner)) {
error(("Owner %s for partition [%s,%d] is not a valid member of consumer " +
"group %s").format(partitionOwner, topic, partition, group))
rebalanceSucceeded = false
}
else
info("Owner of partition [%s,%d] is %s".format(topic, partition, partitionOwner))
case None => {
error("No consumer ids registered for topic " + topic)
rebalanceSucceeded = false
}
}
}
}
}
rebalanceSucceeded
}
}