| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.controller |
| |
| import kafka.api.LeaderAndIsr |
| import kafka.utils.Logging |
| import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} |
| |
| trait PartitionLeaderSelector { |
| |
| /** |
| * @param topicAndPartition The topic and partition whose leader needs to be elected |
| * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper |
| * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive |
| * @return The leader and isr request, with the newly selected leader info, to send to the brokers |
| * Also, returns the list of replicas the returned leader and isr request should be sent to |
| * This API selects a new leader for the input partition |
| */ |
| def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) |
| |
| } |
| |
| /** |
| * This API selects a new leader for the input partition - |
| * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader |
| * 2. Else, it picks some alive broker from the assigned replica list as the new leader |
| * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException |
| * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache |
| */ |
| class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { |
| this.logIdent = "[OfflinePartitionLeaderSelector]: " |
| |
| def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { |
| controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { |
| case Some(assignedReplicas) => |
| val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) |
| val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) |
| val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch |
| val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion |
| val newLeaderAndIsr = liveBrokersInIsr.isEmpty match { |
| case true => |
| debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" |
| .format(topicAndPartition, liveAssignedReplicasToThisPartition.mkString(","))) |
| liveAssignedReplicasToThisPartition.isEmpty match { |
| case true => |
| throw new NoReplicaOnlineException(("No replica for partition " + |
| "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + |
| " Assigned replicas are: [%s]".format(assignedReplicas)) |
| case false => |
| ControllerStats.uncleanLeaderElectionRate.mark() |
| val newLeader = liveAssignedReplicasToThisPartition.head |
| warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." |
| .format(topicAndPartition, newLeader, liveAssignedReplicasToThisPartition.mkString(","))) |
| new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) |
| } |
| case false => |
| val newLeader = liveBrokersInIsr.head |
| debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." |
| .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) |
| new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) |
| } |
| info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) |
| (newLeaderAndIsr, liveAssignedReplicasToThisPartition) |
| case None => |
| throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") |
| } |
| } |
| } |
| |
| /** |
| * Picks one of the alive in-sync reassigned replicas as the new leader. |
| */ |
| class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { |
| this.logIdent = "[ReassignedPartitionLeaderSelector]: " |
| |
| /** |
| * The reassigned replicas are already in the ISR when selectLeader is called. |
| */ |
| def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { |
| val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas |
| val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch |
| val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion |
| val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) |
| val newLeaderOpt = aliveReassignedInSyncReplicas.headOption |
| newLeaderOpt match { |
| case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, |
| currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) |
| case None => |
| reassignedInSyncReplicas.size match { |
| case 0 => |
| throw new StateChangeFailedException("List of reassigned replicas for partition " + |
| " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) |
| case _ => |
| throw new StateChangeFailedException("None of the reassigned replicas for partition " + |
| "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) |
| } |
| } |
| } |
| } |
| |
| /** |
| * Picks the preferred replica as the new leader if - |
| * 1. It is already not the current leader |
| * 2. It is alive |
| */ |
| class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector |
| with Logging { |
| this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: " |
| |
| def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { |
| val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) |
| val preferredReplica = assignedReplicas.head |
| // check if preferred replica is the current leader |
| val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader |
| if (currentLeader == preferredReplica) { |
| throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s" |
| .format(preferredReplica, topicAndPartition)) |
| } else { |
| info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + |
| " Trigerring preferred replica leader election") |
| // check if preferred replica is not the current leader and is alive and in the isr |
| if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { |
| (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, |
| currentLeaderAndIsr.zkVersion + 1), assignedReplicas) |
| } else { |
| throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + |
| "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) |
| } |
| } |
| } |
| } |
| |
| /** |
| * Picks one of the alive replicas (other than the current leader) in ISR as |
| * new leader, fails if there are no other replicas in ISR. |
| */ |
| class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) |
| extends PartitionLeaderSelector |
| with Logging { |
| |
| this.logIdent = "[ControlledShutdownLeaderSelector]: " |
| |
| def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { |
| val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch |
| val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion |
| |
| val currentLeader = currentLeaderAndIsr.leader |
| |
| val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) |
| val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds |
| val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) |
| |
| val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader && |
| !controllerContext.shuttingDownBrokerIds.contains(brokerId)) |
| val newLeaderOpt = newIsr.headOption |
| newLeaderOpt match { |
| case Some(newLeader) => |
| debug("Partition %s : current leader = %d, new leader = %d" |
| .format(topicAndPartition, currentLeader, newLeader)) |
| (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), |
| liveAssignedReplicas) |
| case None => |
| throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" + |
| " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(","))) |
| } |
| } |
| } |
| |
| /** |
| * Essentially does nothing. Returns the current leader and ISR, and the current |
| * set of replicas assigned to a given topic/partition. |
| */ |
| class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { |
| |
| this.logIdent = "[NoOpLeaderSelector]: " |
| |
| def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { |
| warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") |
| (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) |
| } |
| } |