auto rebalance last commit
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 00a1f98..f12ffc2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -603,7 +603,7 @@
}
}
- def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+ def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -612,7 +612,7 @@
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
- removePartitionsFromPreferredReplicaElection(partitions)
+ removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
}
}
@@ -914,7 +914,8 @@
}
}
- def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
+ def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
+ isTriggeredByAutoRebalance : Boolean) {
for(partition <- partitionsToBeRemoved) {
// check the status
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -925,7 +926,8 @@
warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
}
}
- ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+ if (!isTriggeredByAutoRebalance)
+ ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
}
@@ -1090,6 +1092,7 @@
topicsNotInPreferredReplica =
topicAndPartitionsForBroker.filter {
case(topicPartition, replicas) => {
+ controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
}
@@ -1102,26 +1105,19 @@
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
- inLock(controllerContext.controllerLock) {
- // do this check only if the broker is live and there are no partitions being reassigned currently
- // and preferred replica election is not in progress
- if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
- controllerContext.partitionsBeingReassigned.size == 0 &&
- controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
- val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
- val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
- val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
- try {
- ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
- info("Created preferred replica election path with %s".format(jsonData))
- } catch {
- case e2: ZkNodeExistsException =>
- val partitionsUndergoingPreferredReplicaElection =
- PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
- error("Preferred replica leader election currently in progress for " +
- "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
- case e3: Throwable =>
- error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+ topicsNotInPreferredReplica.foreach {
+ case(topicPartition, replicas) => {
+ inLock(controllerContext.controllerLock) {
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+ controllerContext.partitionsBeingReassigned.size == 0 &&
+ controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
+ !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+ !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
+ controllerContext.allTopics.contains(topicPartition.topic)) {
+ onPreferredReplicaElection(Set(topicPartition), false)
+ }
}
}
}