auto rebalance last commit
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 00a1f98..f12ffc2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -603,7 +603,7 @@
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -612,7 +612,7 @@
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions)
+      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
       deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
     }
   }
@@ -914,7 +914,8 @@
     }
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
+                                                   isTriggeredByAutoRebalance : Boolean) {
     for(partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -925,7 +926,8 @@
         warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
       }
     }
-    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+    if (!isTriggeredByAutoRebalance)
+      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
@@ -1090,6 +1092,7 @@
             topicsNotInPreferredReplica =
               topicAndPartitionsForBroker.filter {
                 case(topicPartition, replicas) => {
+                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                   controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
                 }
               }
@@ -1102,26 +1105,19 @@
           // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
           // that need to be on this broker
           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            inLock(controllerContext.controllerLock) {
-              // do this check only if the broker is live and there are no partitions being reassigned currently
-              // and preferred replica election is not in progress
-              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                  controllerContext.partitionsBeingReassigned.size == 0 &&
-                  controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
-                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-                val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
-                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
-                try {
-                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
-                  info("Created preferred replica election path with %s".format(jsonData))
-                } catch {
-                  case e2: ZkNodeExistsException =>
-                    val partitionsUndergoingPreferredReplicaElection =
-                      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
-                    error("Preferred replica leader election currently in progress for " +
-                          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
-                  case e3: Throwable =>
-                    error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
+            topicsNotInPreferredReplica.foreach {
+              case(topicPartition, replicas) => {
+                inLock(controllerContext.controllerLock) {
+                  // do this check only if the broker is live and there are no partitions being reassigned currently
+                  // and preferred replica election is not in progress
+                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                      controllerContext.partitionsBeingReassigned.size == 0 &&
+                      controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
+                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+                      !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
+                      controllerContext.allTopics.contains(topicPartition.topic)) {
+                    onPreferredReplicaElection(Set(topicPartition), false)
+                  }
                 }
               }
             }