| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.tools |
| |
| import joptsimple.OptionParser |
| import org.I0Itec.zkclient.ZkClient |
| import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils} |
| |
| object VerifyConsumerRebalance extends Logging { |
| def main(args: Array[String]) { |
| val parser = new OptionParser() |
| |
| val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string."). |
| withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]) |
| val groupOpt = parser.accepts("group", "Consumer group."). |
| withRequiredArg().ofType(classOf[String]) |
| parser.accepts("help", "Print this message.") |
| |
| if(args.length == 0) |
| CommandLineUtils.printUsageAndDie(parser, "Validate that all partitions have a consumer for a given consumer group.") |
| |
| val options = parser.parse(args : _*) |
| |
| if (options.has("help")) { |
| parser.printHelpOn(System.out) |
| System.exit(0) |
| } |
| |
| CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) |
| |
| val zkConnect = options.valueOf(zkConnectOpt) |
| val group = options.valueOf(groupOpt) |
| |
| var zkClient: ZkClient = null |
| try { |
| zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) |
| |
| debug("zkConnect = %s; group = %s".format(zkConnect, group)) |
| |
| // check if the rebalancing operation succeeded. |
| try { |
| if(validateRebalancingOperation(zkClient, group)) |
| println("Rebalance operation successful !") |
| else |
| println("Rebalance operation failed !") |
| } catch { |
| case e2: Throwable => error("Error while verifying current rebalancing operation", e2) |
| } |
| } |
| finally { |
| if (zkClient != null) |
| zkClient.close() |
| } |
| } |
| |
| private def validateRebalancingOperation(zkClient: ZkClient, group: String): Boolean = { |
| info("Verifying rebalancing operation for consumer group " + group) |
| var rebalanceSucceeded: Boolean = true |
| /** |
| * A successful rebalancing operation would select an owner for each available partition |
| * This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists |
| * under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id] |
| */ |
| val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false) |
| val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq) |
| |
| partitionsPerTopicMap.foreach { partitionsForTopic => |
| val topic = partitionsForTopic._1 |
| val partitions = partitionsForTopic._2 |
| val topicDirs = new ZKGroupTopicDirs(group, topic) |
| info("Alive partitions for topic %s are %s ".format(topic, partitions.toString)) |
| info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic))) |
| val partitionsWithOwners = ZkUtils.getChildrenParentMayNotExist(zkClient, topicDirs.consumerOwnerDir) |
| if(partitionsWithOwners.size == 0) { |
| error("No owners for any partitions for topic " + topic) |
| rebalanceSucceeded = false |
| } |
| debug("Children of " + topicDirs.consumerOwnerDir + " = " + partitionsWithOwners.toString) |
| val consumerIdsForTopic = consumersPerTopicMap.get(topic) |
| |
| // for each available partition for topic, check if an owner exists |
| partitions.foreach { partition => |
| // check if there is a node for [partition] |
| if(!partitionsWithOwners.exists(p => p.equals(partition))) { |
| error("No owner for partition [%s,%d]".format(topic, partition)) |
| rebalanceSucceeded = false |
| } |
| // try reading the partition owner path for see if a valid consumer id exists there |
| val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition |
| val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1 match { |
| case Some(m) => m |
| case None => null |
| } |
| if(partitionOwner == null) { |
| error("No owner for partition [%s,%d]".format(topic, partition)) |
| rebalanceSucceeded = false |
| } |
| else { |
| // check if the owner is a valid consumer id |
| consumerIdsForTopic match { |
| case Some(consumerIds) => |
| if(!consumerIds.map(c => c.toString).contains(partitionOwner)) { |
| error(("Owner %s for partition [%s,%d] is not a valid member of consumer " + |
| "group %s").format(partitionOwner, topic, partition, group)) |
| rebalanceSucceeded = false |
| } |
| else |
| info("Owner of partition [%s,%d] is %s".format(topic, partition, partitionOwner)) |
| case None => { |
| error("No consumer ids registered for topic " + topic) |
| rebalanceSucceeded = false |
| } |
| } |
| } |
| } |
| |
| } |
| |
| rebalanceSucceeded |
| } |
| } |