blob: ea23beb4192344c3daba3c33e9a40ddaea6a49ac [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CountDownLatch, TimeUnit}
import com.yammer.metrics.core.Gauge
import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
import kafka.utils._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk._
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, ApiError, LeaderAndIsrResponse, StopReplicaResponse}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import scala.collection._
import scala.util.{Failure, Try}
object KafkaController extends Logging {
val InitialControllerEpoch = 0
val InitialControllerEpochZkVersion = 0
/**
* ControllerEventThread will shutdown once it sees this event
*/
private[controller] case object ShutdownEventThread extends ControllerEvent {
def state = ControllerState.ControllerShutdown
override def process(): Unit = ()
}
// Used only by test
private[controller] case class AwaitOnLatch(latch: CountDownLatch) extends ControllerEvent {
override def state: ControllerState = ControllerState.ControllerChange
override def process(): Unit = latch.await()
}
}
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics,
initialBrokerInfo: BrokerInfo, initialBrokerEpoch: Long, tokenManager: DelegationTokenManager,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
@volatile private var brokerInfo = initialBrokerInfo
@volatile private var _brokerEpoch = initialBrokerEpoch
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
// have a separate scheduler for the controller to be able to start and stop independently of the kafka server
// visible for testing
private[controller] val kafkaScheduler = new KafkaScheduler(1)
// visible for testing
private[controller] val eventManager = new ControllerEventManager(config.brokerId,
controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign())
val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)
@volatile private var activeControllerId = -1
@volatile private var offlinePartitionCount = 0
@volatile private var preferredReplicaImbalanceCount = 0
@volatile private var globalTopicCount = 0
@volatile private var globalPartitionCount = 0
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "delegation-token-cleaner")
newGauge(
"ActiveControllerCount",
new Gauge[Int] {
def value = if (isActive) 1 else 0
}
)
newGauge(
"OfflinePartitionsCount",
new Gauge[Int] {
def value: Int = offlinePartitionCount
}
)
newGauge(
"PreferredReplicaImbalanceCount",
new Gauge[Int] {
def value: Int = preferredReplicaImbalanceCount
}
)
newGauge(
"ControllerState",
new Gauge[Byte] {
def value: Byte = state.value
}
)
newGauge(
"GlobalTopicCount",
new Gauge[Int] {
def value: Int = globalTopicCount
}
)
newGauge(
"GlobalPartitionCount",
new Gauge[Int] {
def value: Int = globalPartitionCount
}
)
/**
* Returns true if this broker is the current controller.
*/
def isActive: Boolean = activeControllerId == config.brokerId
def brokerEpoch: Long = _brokerEpoch
def epoch: Int = controllerContext.epoch
/**
* Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
* is the controller. It merely registers the session expiration listener and starts the controller leader
* elector
*/
def startup() = {
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler
override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect)
}
override def beforeInitializingSession(): Unit = {
val expireEvent = new Expire
eventManager.clearAndPut(expireEvent)
// Block initialization of the new session until the expiration event is being handled,
// which ensures that all pending events have been processed before creating the new session
expireEvent.waitUntilProcessingStarted()
}
})
eventManager.put(Startup)
eventManager.start()
}
/**
* Invoked when the controller module of a Kafka server is shutting down. If the broker was the current controller,
* it shuts down the partition and replica state machines. If not, those are a no-op. In addition to that, it also
* shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
*/
def shutdown() = {
eventManager.close()
onControllerResignation()
}
/**
* On controlled shutdown, the controller first determines the partitions that the
* shutting down broker leads, and moves leadership of those partitions to another broker
* that is in that partition's ISR.
*
* @param id Id of the broker to shutdown.
* @param brokerEpoch The broker epoch in the controlled shutdown request
* @return The number of partitions that the broker still leads.
*/
def controlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit = {
val controlledShutdownEvent = ControlledShutdown(id, brokerEpoch, controlledShutdownCallback)
eventManager.put(controlledShutdownEvent)
}
private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = {
this.brokerInfo = newBrokerInfo
zkClient.updateBrokerInfo(newBrokerInfo)
}
private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
eventManager.put(UncleanLeaderElectionEnable)
}
private[kafka] def enableTopicUncleanLeaderElection(topic: String): Unit = {
if (isActive) {
eventManager.put(TopicUncleanLeaderElectionEnable(topic))
}
}
private def state: ControllerState = eventManager.state
/**
* This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
* It does the following things on the become-controller state change -
* 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
* leaders for all existing partitions.
* 2. Starts the controller's channel manager
* 3. Starts the replica state machine
* 4. Starts the partition state machine
* If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
private def onControllerFailover() {
info("Registering handlers")
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion)
info("Deleting isr change notifications")
zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion)
info("Initializing controller context")
initializeControllerContext()
info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
info("Initializing topic deletion manager")
topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
// are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
info("Sending update metadata request")
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
replicaStateMachine.startup()
partitionStateMachine.startup()
info(s"Ready to serve as the new controller with epoch $epoch")
maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered)
info("Starting the controller scheduler")
kafkaScheduler.startup()
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = () => tokenManager.expireTokens,
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
}
private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = {
kafkaScheduler.schedule("auto-leader-rebalance-task", () => eventManager.put(AutoPreferredReplicaLeaderElection),
delay = delay, unit = unit)
}
/**
* This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
* required to clean up internal controller data structures
*/
private def onControllerResignation() {
debug("Resigning")
// de-register listeners
zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
// reset topic deletion manager
topicDeletionManager.reset()
// shutdown leader rebalance scheduler
kafkaScheduler.shutdown()
offlinePartitionCount = 0
preferredReplicaImbalanceCount = 0
globalTopicCount = 0
globalPartitionCount = 0
// stop token expiry check scheduler
if (tokenCleanScheduler.isStarted)
tokenCleanScheduler.shutdown()
// de-register partition ISR listener for on-going partition reassignment task
unregisterPartitionReassignmentIsrChangeHandlers()
// shutdown partition state machine
partitionStateMachine.shutdown()
zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
// shutdown replica state machine
replicaStateMachine.shutdown()
zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
controllerContext.resetContext()
info("Resigned")
}
/*
* This callback is invoked by the controller's LogDirEventNotificationListener with the list of broker ids who
* have experienced new log directory failures. In response the controller should send LeaderAndIsrRequest
* to all these brokers to query the state of their replicas. Replicas with an offline log directory respond with
* KAFKA_STORAGE_ERROR, which will be handled by the LeaderAndIsrResponseReceived event.
*/
private def onBrokerLogDirFailure(brokerIds: Seq[Int]) {
// send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online.
info(s"Handling log directory failure for brokers ${brokerIds.mkString(",")}")
val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet)
replicaStateMachine.handleStateChanges(replicasOnBrokers.toSeq, OnlineReplica)
}
/**
* This callback is invoked by the replica state machine's broker change listener, with the list of newly started
* brokers as input. It does the following -
* 1. Sends update metadata request to all live and shutting down brokers
* 2. Triggers the OnlinePartition state change for all new/offline partitions
* 3. It checks whether there are reassigned replicas assigned to any newly started brokers. If
* so, it performs the reassignment logic for each topic/partition.
*
* Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point for two reasons:
* 1. The partition state machine, when triggering online state change, will refresh leader and ISR for only those
* partitions currently new or offline (rather than every partition this controller is aware of)
* 2. Even if we do refresh the cache, there is no guarantee that by the time the leader and ISR request reaches
* every broker that it is still valid. Brokers check the leader epoch to determine validity of the request.
*/
private def onBrokerStartup(newBrokers: Seq[Int]) {
info(s"New broker startup callback for ${newBrokers.mkString(",")}")
newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
val newBrokersSet = newBrokers.toSet
val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers
// Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers
// via this update. No need to include any partition states in the request since there are no partition state changes.
sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)
// Send update metadata request to all the new brokers in the cluster with a full set of partition states for initialization.
// In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
// common controlled shutdown case, the metadata will reach the new brokers faster.
sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet)
// the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
// supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
// when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
// to see if these brokers can become leaders for some/all of those
partitionStateMachine.triggerOnlinePartitionStateChange()
// check if reassignment of some partitions need to be restarted
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)
}
partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) }
// check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
// on the newly restarted brokers, there is a chance that topic deletion can resume
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
if (replicasForTopicsToBeDeleted.nonEmpty) {
info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
}
registerBrokerModificationsHandler(newBrokers)
}
private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
debug(s"Register BrokerModifications handler for $brokerIds")
brokerIds.foreach { brokerId =>
val brokerModificationsHandler = new BrokerModificationsHandler(this, eventManager, brokerId)
zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler)
brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
}
}
private def unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
debug(s"Unregister BrokerModifications handler for $brokerIds")
brokerIds.foreach { brokerId =>
brokerModificationsHandlers.remove(brokerId).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
}
}
/*
* This callback is invoked by the replica state machine's broker change listener with the list of failed brokers
* as input. It will call onReplicaBecomeOffline(...) with the list of replicas on those failed brokers as input.
*/
private def onBrokerFailure(deadBrokers: Seq[Int]) {
info(s"Broker failure callback for ${deadBrokers.mkString(",")}")
deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
val deadBrokersThatWereShuttingDown =
deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
if (deadBrokersThatWereShuttingDown.nonEmpty)
info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")
val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
onReplicasBecomeOffline(allReplicasOnDeadBrokers)
unregisterBrokerModificationsHandler(deadBrokers)
}
private def onBrokerUpdate(updatedBrokerId: Int) {
info(s"Broker info update callback for $updatedBrokerId")
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
}
/**
* This method marks the given replicas as offline. It does the following -
* 1. Marks the given partitions as offline
* 2. Triggers the OnlinePartition state change for all new/offline partitions
* 3. Invokes the OfflineReplica state change on the input list of newly offline replicas
* 4. If no partitions are affected then send UpdateMetadataRequest to live or shutting down brokers
*
* Note that we don't need to refresh the leader/isr cache for all topic/partitions at this point. This is because
* the partition state machine will refresh our cache for us when performing leader election for all new/offline
* partitions coming online.
*/
private def onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit = {
val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
!controllerContext.isReplicaOnline(partitionAndLeader._2.leaderAndIsr.leader, partitionAndLeader._1) &&
!topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()
// trigger OfflineReplica state change for those newly offline replicas
replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
// fail deletion of topics that are affected by the offline replicas
if (newOfflineReplicasForDeletion.nonEmpty) {
// it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
// deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely
// since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
}
// If replica failure did not require leader re-election, inform brokers of the offline brokers
// Note that during leader re-election, brokers update their metadata
if (partitionsWithoutLeader.isEmpty) {
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
}
}
/**
* This callback is invoked by the topic change callback with the list of failed brokers as input.
* It does the following -
* 1. Move the newly created partitions to the NewPartition state
* 2. Move the newly created partitions from NewPartition->OnlinePartition state
*/
private def onNewPartitionCreation(newPartitions: Set[TopicPartition]) {
info(s"New partition creation callback for ${newPartitions.mkString(",")}")
partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
}
/**
* This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
* reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
* Reassigning replicas for a partition goes through a few steps listed in the code.
* RAR = Reassigned replicas
* OAR = Original list of replicas for partition
* AR = current assigned replicas
*
* 1. Update AR in ZK with OAR + RAR.
* 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
* of the leader epoch in zookeeper.
* 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
* 4. Wait until all replicas in RAR are in sync with the leader.
* 5 Move all replicas in RAR to OnlineReplica state.
* 6. Set AR to RAR in memory.
* 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
* will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
* In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
* RAR - OAR back in the isr.
* 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
* isr to remove OAR - RAR in zookeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
* After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
* 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = true) to
* the replicas in OAR - RAR to physically delete the replicas on disk.
* 10. Update AR in ZK with RAR.
* 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
* 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
*
* For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
* may go through the following transition.
* AR leader/isr
* {1,2,3} 1/{1,2,3} (initial state)
* {1,2,3,4,5,6} 1/{1,2,3} (step 2)
* {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
* {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
* {1,2,3,4,5,6} 4/{4,5,6} (step 8)
* {4,5,6} 4/{4,5,6} (step 10)
*
* Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
* This way, if the controller crashes before that step, we can still recover.
*/
private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {
info(s"New replicas ${reassignedReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " +
"caught up with the leader")
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet
//1. Update AR in ZK with OAR + RAR.
updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)
//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition),
newAndOldReplicas.toSeq)
//3. replicas in RAR - OAR -> NewReplica
startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info(s"Waiting for new replicas ${reassignedReplicas.mkString(",")} for partition ${topicPartition} being " +
"reassigned to catch up with the leader")
} else {
//4. Wait until all replicas in RAR are in sync with the leader.
val oldReplicas = controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet
//5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), OnlineReplica)
}
//6. Set AR to RAR in memory.
//7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
// a new AR (using RAR) and same isr to every broker in RAR
moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext)
//8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas)
//10. Update AR in ZK with RAR.
updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
//11. Update the /admin/reassign_partitions path in ZK to remove this partition.
removePartitionsFromReassignedPartitions(Set(topicPartition))
//12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
}
}
/**
* Trigger partition reassignment for the provided partitions if the assigned replicas are not the same as the
* reassigned replicas (as defined in `ControllerContext.partitionsBeingReassigned`) and if the topic has not been
* deleted.
*
* `partitionsBeingReassigned` must be populated with all partitions being reassigned before this method is invoked
* as explained in the method documentation of `removePartitionFromReassignedPartitions` (which is invoked by this
* method).
*
* @throws IllegalStateException if a partition is not in `partitionsBeingReassigned`
*/
private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {
val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]
topicPartitions.foreach { tp =>
if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
info(s"Skipping reassignment of $tp since the topic is currently being deleted")
partitionsToBeRemovedFromReassignment.add(tp)
} else {
val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +
s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}")
}
val newReplicas = reassignedPartitionContext.newReplicas
val topic = tp.topic
val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
if (assignedReplicas.nonEmpty) {
if (assignedReplicas == newReplicas) {
info(s"Partition $tp to be reassigned is already assigned to replicas " +
s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
partitionsToBeRemovedFromReassignment.add(tp)
} else {
try {
info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
// first register ISR change listener
reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
// mark topic ineligible for deletion for the partitions being reassigned
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(tp, reassignedPartitionContext)
} catch {
case e: ControllerMovedException =>
error(s"Error completing reassignment of partition $tp because controller has moved to another broker", e)
throw e
case e: Throwable =>
error(s"Error completing reassignment of partition $tp", e)
// remove the partition from the admin path to unblock the admin client
partitionsToBeRemovedFromReassignment.add(tp)
}
}
} else {
error(s"Ignoring request to reassign partition $tp that doesn't exist.")
partitionsToBeRemovedFromReassignment.add(tp)
}
}
}
removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
}
sealed trait ElectionType
object AutoTriggered extends ElectionType
object ZkTriggered extends ElectionType
object AdminClientTriggered extends ElectionType
/**
* Attempt to elect the preferred replica as leader for each of the given partitions.
* @param partitions The partitions to have their preferred leader elected
* @param electionType The election type
* @return A map of failed elections where keys are partitions which had an error and the corresponding value is
* the exception that was thrown.
*/
private def onPreferredReplicaElection(partitions: Set[TopicPartition],
electionType: ElectionType): Map[TopicPartition, Throwable] = {
info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}")
try {
val results = partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition,
Option(PreferredReplicaPartitionLeaderElectionStrategy))
if (electionType != AdminClientTriggered) {
results.foreach { case (tp, throwable) =>
if (throwable.isInstanceOf[ControllerMovedException]) {
error(s"Error completing preferred replica leader election for partition $tp because controller has moved to another broker.", throwable)
throw throwable
} else {
error(s"Error completing preferred replica leader election for partition $tp", throwable)
}
}
}
return results;
} finally {
if (electionType != AdminClientTriggered)
removePartitionsFromPreferredReplicaElection(partitions, electionType == AutoTriggered)
}
}
private def initializeControllerContext() {
// update controller cache with delete topic information
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
controllerContext.setLiveBrokerAndEpochs(curBrokerAndEpochs)
info(s"Initialized broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach {
case (topicPartition, assignedReplicas) => controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
}
controllerContext.partitionLeadershipInfo.clear()
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// register broker modifications handlers
registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
startChannelManager()
initializePartitionReassignment()
info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
info(s"Current list of topics in the cluster: ${controllerContext.allTopics}")
}
private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
// check if they are already completed or topic was deleted
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
val topicDeleted = replicas.isEmpty
val successful =
if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false
successful || topicDeleted
}
val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
info(s"Partitions undergoing preferred replica election: ${partitionsUndergoingPreferredReplicaElection.mkString(",")}")
info(s"Partitions that completed preferred replica election: ${partitionsThatCompletedPreferredReplicaElection.mkString(",")}")
info(s"Skipping preferred replica election for partitions due to topic deletion: ${pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")}")
info(s"Resuming preferred replica election for partitions: ${pendingPreferredReplicaElections.mkString(",")}")
pendingPreferredReplicaElections
}
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
val partitionsBeingReassigned = zkClient.getPartitionReassignment
info(s"Partitions being reassigned: $partitionsBeingReassigned")
controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned.iterator.map { case (tp, newReplicas) =>
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, tp)
tp -> new ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)
}
}
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {
val replicasForTopic = controllerContext.replicasForTopic(topic)
replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))
}}
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")
(topicsToBeDeleted, topicsIneligibleForDeletion)
}
private def startChannelManager() {
controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
stateChangeLogger, threadNamePrefix)
controllerContext.controllerChannelManager.startup()
}
private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq) {
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
}
}
private def areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean = {
zkClient.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
}
}
private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
val currentLeader = controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader
// change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
// request to the current or new leader. This will prevent it from adding the old replicas to the ISR
val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition)
controllerContext.updatePartitionReplicaAssignment(topicPartition, reassignedReplicas)
if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader")
// move the leader to one of the alive and caught up new replicas
partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
} else {
// check if the leader is alive or not
if (controllerContext.isReplicaOnline(currentLeader, topicPartition)) {
info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} and is alive")
// shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
updateLeaderEpochAndSendRequest(topicPartition, oldAndNewReplicas, reassignedReplicas)
} else {
info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
s"is already in the new list of replicas ${reassignedReplicas.mkString(",")} but is dead")
partitionStateMachine.handleStateChanges(Seq(topicPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
}
}
}
private def stopOldReplicasOfReassignedPartition(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
oldReplicas: Set[Int]) {
// first move the replica to offline state (the controller removes it from the ISR)
val replicasToBeDeleted = oldReplicas.map(PartitionAndReplica(topicPartition, _))
replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, OfflineReplica)
// send stop replica command to the old replicas
replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionStarted)
// TODO: Eventually partition reassignment could use a callback that does retries if deletion failed
replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionSuccessful)
replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, NonExistentReplica)
}
private def updateAssignedReplicasForPartition(partition: TopicPartition,
replicas: Seq[Int]) {
controllerContext.updatePartitionReplicaAssignment(partition, replicas)
val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic), controllerContext.epochZkVersion)
setDataResponse.resultCode match {
case Code.OK =>
info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}")
// update the assigned replica list after a successful zookeeper write
controllerContext.updatePartitionReplicaAssignment(partition, replicas)
case Code.NONODE => throw new IllegalStateException(s"Topic ${partition.topic} doesn't exist")
case _ => throw new KafkaException(setDataResponse.resultException.get)
}
}
private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
newReplicas: Set[Int]) {
// send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
// replicas list
newReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), NewReplica)
}
}
private def updateLeaderEpochAndSendRequest(partition: TopicPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
updateLeaderEpoch(partition) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition,
updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: IllegalStateException =>
handleIllegalState(e)
}
stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with new assigned replica " +
s"list ${newAssignedReplicas.mkString(",")} to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
s"for partition being reassigned $partition")
case None => // fail the reassignment
stateChangeLog.error("Failed to send LeaderAndIsr request with new assigned replica list " +
s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $partition")
}
}
private def registerPartitionModificationsHandlers(topics: Seq[String]) = {
topics.foreach { topic =>
val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
partitionModificationsHandlers.put(topic, partitionModificationsHandler)
}
partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
}
private[controller] def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
topics.foreach { topic =>
partitionModificationsHandlers.remove(topic).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
}
}
private def unregisterPartitionReassignmentIsrChangeHandlers() {
controllerContext.partitionsBeingReassigned.values.foreach(_.unregisterReassignIsrChangeHandler(zkClient))
}
/**
* Remove partition from partitions being reassigned in ZooKeeper and ControllerContext. If the partition reassignment
* is complete (i.e. there is no other partition with a reassignment in progress), the reassign_partitions znode
* is deleted.
*
* `ControllerContext.partitionsBeingReassigned` must be populated with all partitions being reassigned before this
* method is invoked to avoid premature deletion of the `reassign_partitions` znode.
*/
private def removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]) {
partitionsToBeRemoved.map(controllerContext.partitionsBeingReassigned).foreach { reassignContext =>
reassignContext.unregisterReassignIsrChangeHandler(zkClient)
}
val updatedPartitionsBeingReassigned = controllerContext.partitionsBeingReassigned -- partitionsToBeRemoved
info(s"Removing partitions $partitionsToBeRemoved from the list of reassigned partitions in zookeeper")
// write the new list to zookeeper
if (updatedPartitionsBeingReassigned.isEmpty) {
info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}")
zkClient.deletePartitionReassignment(controllerContext.epochZkVersion)
// Ensure we detect future reassignments
eventManager.put(PartitionReassignment)
} else {
val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
try zkClient.setOrCreatePartitionReassignment(reassignment, controllerContext.epochZkVersion)
catch {
case e: KeeperException => throw new AdminOperationException(e)
}
}
controllerContext.partitionsBeingReassigned --= partitionsToBeRemoved
}
private def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicPartition],
isTriggeredByAutoRebalance : Boolean) {
for (partition <- partitionsToBeRemoved) {
// check the status
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
if (currentLeader == preferredReplica) {
info(s"Partition $partition completed preferred replica leader election. New leader is $preferredReplica")
} else {
warn(s"Partition $partition failed to complete preferred replica leader election to $preferredReplica. " +
s"Leader is still $currentLeader")
}
}
if (!isTriggeredByAutoRebalance) {
zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion)
// Ensure we detect future preferred replica leader elections
eventManager.put(PreferredReplicaLeaderElection(None))
}
}
private[controller] def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
callback: AbstractResponse => Unit = null) = {
controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
}
/**
* Send the leader information for selected partitions to selected brokers so that they can correctly respond to
* metadata requests
*
* @param brokers The brokers that the update metadata request should be sent to
*/
private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]) {
try {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
case e: IllegalStateException =>
handleIllegalState(e)
}
}
/**
* Does not change leader or isr, but just increments the leader epoch
*
* @param partition partition
* @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
*/
private def updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
debug(s"Updating leader epoch for partition $partition")
var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
// refresh leader and isr from zookeeper again
zkWriteCompleteOrUnnecessary = zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
case Some(leaderIsrAndControllerEpoch) =>
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
if (controllerEpoch > epoch)
throw new StateChangeFailedException("Leader and isr path written by another controller. This probably " +
s"means the current controller with epoch $epoch went through a soft failure and another " +
s"controller was elected with epoch $controllerEpoch. Aborting state change by this controller")
// increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded
// assigned replica list
val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
// update the new leadership decision in zookeeper or retry
val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) =
zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
if (successfulUpdates.contains(partition)) {
val finalLeaderAndIsr = successfulUpdates(partition)
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
info(s"Updated leader epoch for partition $partition to ${finalLeaderAndIsr.leaderEpoch}")
true
} else if (failedUpdates.contains(partition)) {
throw failedUpdates(partition)
} else false
case None =>
throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " +
"leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist")
}
}
finalLeaderIsrAndControllerEpoch
}
private def checkAndTriggerAutoLeaderRebalance(): Unit = {
trace("Checking need to trigger auto leader balancing")
val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] =
controllerContext.allPartitions.filterNot {
tp => topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
}.map { tp =>
(tp, controllerContext.partitionReplicaAssignment(tp) )
}.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers")
// for each broker, check if a preferred replica election needs to be triggered
preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicPartitionsForBroker) =>
val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) =>
val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
}
debug(s"Topics not in preferred replica for broker $leaderBroker $topicsNotInPreferredReplica")
val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicPartitionsForBroker.size
trace(s"Leader imbalance ratio for broker $leaderBroker is $imbalanceRatio")
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
controllerContext.allTopics.contains(tp.topic))
onPreferredReplicaElection(candidatePartitions.toSet, AutoTriggered)
}
}
}
case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
def state = ControllerState.AutoLeaderBalance
override def process(): Unit = {
if (!isActive) return
try {
checkAndTriggerAutoLeaderRebalance()
} finally {
scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
}
}
}
case object UncleanLeaderElectionEnable extends ControllerEvent {
def state = ControllerState.UncleanLeaderElectionEnable
override def process(): Unit = {
if (!isActive) return
partitionStateMachine.triggerOnlinePartitionStateChange()
}
}
case class TopicUncleanLeaderElectionEnable(topic: String) extends ControllerEvent {
def state = ControllerState.TopicUncleanLeaderElectionEnable
override def process(): Unit = {
if (!isActive) return
partitionStateMachine.triggerOnlinePartitionStateChange(topic)
}
}
case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends PreemptableControllerEvent {
def state = ControllerState.ControlledShutdown
override def handlePreempt(): Unit = {
controlledShutdownCallback(Failure(new ControllerMovedException("Controller moved to another broker")))
}
override def handleProcess(): Unit = {
val controlledShutdownResult = Try { doControlledShutdown(id) }
controlledShutdownCallback(controlledShutdownResult)
}
private def doControlledShutdown(id: Int): Set[TopicPartition] = {
if (!isActive) {
throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
}
// broker epoch in the request is unknown if the controller hasn't been upgraded to use KIP-380
// so we will keep the previous behavior and don't reject the request
if (brokerEpoch != AbstractControlRequest.UNKNOWN_BROKER_EPOCH) {
val cachedBrokerEpoch = controllerContext.liveBrokerIdAndEpochs(id)
if (brokerEpoch < cachedBrokerEpoch) {
val stateBrokerEpochErrorMessage = "Received controlled shutdown request from an old broker epoch " +
s"$brokerEpoch for broker $id. Current broker epoch is $cachedBrokerEpoch."
info(stateBrokerEpochErrorMessage)
throw new StaleBrokerEpochException(stateBrokerEpochErrorMessage)
}
}
info(s"Shutting down broker $id")
if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
throw new BrokerNotAvailableException(s"Broker id $id does not exist.")
controllerContext.shuttingDownBrokerIds.add(id)
debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
controllerContext.partitionReplicaAssignment(partition).size > 1 &&
controllerContext.partitionLeadershipInfo.contains(partition) &&
!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)
}
val (partitionsLedByBroker, partitionsFollowedByBroker) = partitionsToActOn.partition { partition =>
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
}
partitionStateMachine.handleStateChanges(partitionsLedByBroker.toSeq, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
try {
brokerRequestBatch.newBatch()
partitionsFollowedByBroker.foreach { partition =>
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false,
(_, _) => ())
}
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
case e: IllegalStateException =>
handleIllegalState(e)
}
// If the broker is a follower, updates the isr in ZK and notifies the current leader
replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
def replicatedPartitionsBrokerLeads() = {
trace(s"All leaders = ${controllerContext.partitionLeadershipInfo.mkString(",")}")
controllerContext.partitionLeadershipInfo.filter {
case (topicPartition, leaderIsrAndControllerEpoch) =>
!topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
controllerContext.partitionReplicaAssignment(topicPartition).size > 1
}.keys
}
replicatedPartitionsBrokerLeads().toSet
}
}
case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
def state = ControllerState.LeaderAndIsrResponseReceived
override def process(): Unit = {
import JavaConverters._
if (!isActive) return
val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
if (leaderAndIsrResponse.error != Errors.NONE) {
stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
return
}
val offlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
case (tp, error) if error == Errors.KAFKA_STORAGE_ERROR => tp
}
val onlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
case (tp, error) if error == Errors.NONE => tp
}
val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicPartition])
val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
if (newOfflineReplicas.nonEmpty) {
stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
onReplicasBecomeOffline(newOfflineReplicas.map(PartitionAndReplica(_, brokerId)))
}
}
}
case class TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
def state = ControllerState.TopicDeletion
override def process(): Unit = {
import JavaConverters._
if (!isActive) return
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
debug(s"Delete topic callback invoked for $stopReplicaResponse")
val responseMap = stopReplicaResponse.responses.asScala
val partitionsInError =
if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
val replicasInError = partitionsInError.map(PartitionAndReplica(_, replicaId))
// move all the failed replicas to ReplicaDeletionIneligible
topicDeletionManager.failReplicaDeletion(replicasInError)
if (replicasInError.size != responseMap.size) {
// some replicas could have been successfully deleted
val deletedReplicas = responseMap.keySet -- partitionsInError
topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(PartitionAndReplica(_, replicaId)))
}
}
}
case object Startup extends ControllerEvent {
def state = ControllerState.ControllerChange
override def process(): Unit = {
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
elect()
}
}
private def updateMetrics(): Unit = {
offlinePartitionCount =
if (!isActive) {
0
} else {
partitionStateMachine.offlinePartitionCount
}
preferredReplicaImbalanceCount =
if (!isActive) {
0
} else {
controllerContext.allPartitions.count { topicPartition =>
val replicas = controllerContext.partitionReplicaAssignment(topicPartition)
val preferredReplica = replicas.head
val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) &&
!topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
}
}
globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size
}
// visible for testing
private[controller] def handleIllegalState(e: IllegalStateException): Nothing = {
// Resign if the controller is in an illegal state
error("Forcing the controller to resign")
brokerRequestBatch.clear()
triggerControllerMove()
throw e
}
private def triggerControllerMove(): Unit = {
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (!isActive) {
warn("Controller has already moved when trying to trigger controller movement")
return
}
try {
val expectedControllerEpochZkVersion = controllerContext.epochZkVersion
activeControllerId = -1
onControllerResignation()
zkClient.deleteController(expectedControllerEpochZkVersion)
} catch {
case _: ControllerMovedException =>
warn("Controller has already moved when trying to trigger controller movement")
}
}
private def maybeResign(): Unit = {
val wasActiveBeforeChange = isActive
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (wasActiveBeforeChange && !isActive) {
onControllerResignation()
}
}
private def elect(): Unit = {
activeControllerId = zkClient.getControllerId.getOrElse(-1)
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
* it's possible that the controller has already been elected when we get here. This check will prevent the following
* createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
*/
if (activeControllerId != -1) {
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
try {
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
s"and epoch zk version is now ${controllerContext.epochZkVersion}")
onControllerFailover()
} catch {
case e: ControllerMovedException =>
maybeResign()
if (activeControllerId != -1)
debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
else
warn("A controller has been elected but just resigned, this will result in another round of election", e)
case t: Throwable =>
error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
s"Trigger controller movement immediately", t)
triggerControllerMove()
}
}
case object BrokerChange extends ControllerEvent {
override def state: ControllerState = ControllerState.BrokerChange
override def process(): Unit = {
if (!isActive) return
val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
val curBrokerIds = curBrokerIdAndEpochs.keySet
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
.filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
val newBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => newBrokerIds.contains(broker.id))
val bouncedBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => bouncedBrokerIds.contains(broker.id))
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted
info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, " +
s"bounced brokers: ${bouncedBrokerIdsSorted.mkString(",")}, " +
s"all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
newBrokerAndEpochs.keySet.foreach(controllerContext.controllerChannelManager.addBroker)
bouncedBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
bouncedBrokerAndEpochs.keySet.foreach(controllerContext.controllerChannelManager.addBroker)
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
if (newBrokerIds.nonEmpty) {
controllerContext.addLiveBrokersAndEpochs(newBrokerAndEpochs)
onBrokerStartup(newBrokerIdsSorted)
}
if (bouncedBrokerIds.nonEmpty) {
controllerContext.removeLiveBrokersAndEpochs(bouncedBrokerIds)
onBrokerFailure(bouncedBrokerIdsSorted)
controllerContext.addLiveBrokersAndEpochs(bouncedBrokerAndEpochs)
onBrokerStartup(bouncedBrokerIdsSorted)
}
if (deadBrokerIds.nonEmpty) {
controllerContext.removeLiveBrokersAndEpochs(deadBrokerIds)
onBrokerFailure(deadBrokerIdsSorted)
}
if (newBrokerIds.nonEmpty || deadBrokerIds.nonEmpty || bouncedBrokerIds.nonEmpty) {
info(s"Updated broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
}
}
}
case class BrokerModifications(brokerId: Int) extends ControllerEvent {
override def state: ControllerState = ControllerState.BrokerChange
override def process(): Unit = {
if (!isActive) return
val newMetadata = zkClient.getBroker(brokerId)
val oldMetadata = controllerContext.liveBrokers.find(_.id == brokerId)
if (newMetadata.nonEmpty && oldMetadata.nonEmpty && newMetadata.map(_.endPoints) != oldMetadata.map(_.endPoints)) {
info(s"Updated broker: ${newMetadata.get}")
controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
onBrokerUpdate(brokerId)
}
}
}
case object TopicChange extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicChange
override def process(): Unit = {
if (!isActive) return
val topics = zkClient.getAllTopicsInCluster.toSet
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
deletedTopics.foreach(controllerContext.removeTopic)
addedPartitionReplicaAssignment.foreach {
case (topicAndPartition, newReplicas) => controllerContext.updatePartitionReplicaAssignment(topicAndPartition, newReplicas)
}
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
if (addedPartitionReplicaAssignment.nonEmpty)
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}
}
case object LogDirEventNotification extends ControllerEvent {
override def state: ControllerState = ControllerState.LogDirChange
override def process(): Unit = {
if (!isActive) return
val sequenceNumbers = zkClient.getAllLogDirEventNotifications
try {
val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
onBrokerLogDirFailure(brokerIds)
} finally {
// delete processed children
zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
}
}
}
case class PartitionModifications(topic: String) extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicChange
def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit = {
info("Restoring the partition replica assignment for topic %s".format(topic))
val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic))
val existingPartitionReplicaAssignment = newPartitionReplicaAssignment.filter(p =>
existingPartitions.contains(p._1.partition.toString))
zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment, controllerContext.epochZkVersion)
}
override def process(): Unit = {
if (!isActive) return
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
}
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
if (partitionsToBeAdded.nonEmpty) {
warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
} else {
// This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
info("Ignoring partition change during topic deletion as no new partitions are added")
}
else {
if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
}
onNewPartitionCreation(partitionsToBeAdded.keySet)
}
}
}
}
case object TopicDeletion extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicDeletion
override def process(): Unit = {
if (!isActive) return
var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
}
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if (partitionReassignmentInProgress)
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
} else {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
info(s"Removing $topicsToBeDeleted since delete topic is disabled")
zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
}
}
}
case object PartitionReassignment extends ControllerEvent {
override def state: ControllerState = ControllerState.PartitionReassignment
override def process(): Unit = {
if (!isActive) return
// We need to register the watcher if the path doesn't exist in order to detect future reassignments and we get
// the `path exists` check for free
if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
val partitionReassignment = zkClient.getPartitionReassignment
// Populate `partitionsBeingReassigned` with all partitions being reassigned before invoking
// `maybeTriggerPartitionReassignment` (see method documentation for the reason)
partitionReassignment.foreach { case (tp, newReplicas) =>
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this, eventManager,
tp)
controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
}
maybeTriggerPartitionReassignment(partitionReassignment.keySet)
}
}
}
case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent {
override def state: ControllerState = ControllerState.PartitionReassignment
override def process(): Unit = {
if (!isActive) return
// check if this partition is still being reassigned or not
controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
if (caughtUpReplicas == reassignedReplicas) {
// resume the partition reassignment process
info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
s"partition $partition being reassigned. Resuming partition reassignment")
onPartitionReassignment(partition, reassignedPartitionContext)
}
else {
info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
s"partition $partition being reassigned. Replica(s) " +
s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
}
case None => error(s"Error handling reassignment of partition $partition to replicas " +
s"${reassignedReplicas.mkString(",")} as it was never created")
}
}
}
}
case object IsrChangeNotification extends ControllerEvent {
override def state: ControllerState = ControllerState.IsrChange
override def process(): Unit = {
if (!isActive) return
val sequenceNumbers = zkClient.getAllIsrChangeNotifications
try {
val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
if (partitions.nonEmpty) {
updateLeaderAndIsrCache(partitions)
processUpdateNotifications(partitions)
}
} finally {
// delete the notifications
zkClient.deleteIsrChangeNotifications(sequenceNumbers, controllerContext.epochZkVersion)
}
}
private def processUpdateNotifications(partitions: Seq[TopicPartition]) {
val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
debug(s"Sending MetadataRequest to Brokers: $liveBrokers for TopicPartitions: $partitions")
sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
}
}
type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], Map[TopicPartition, ApiError])=>Unit
def electPreferredLeaders(partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_,_) => }): Unit =
eventManager.put(PreferredReplicaLeaderElection(Some(partitions), AdminClientTriggered, callback))
case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
electionType: ElectionType = ZkTriggered,
callback: ElectPreferredLeadersCallback = (_,_) =>{}) extends PreemptableControllerEvent {
override def state: ControllerState = ControllerState.ManualLeaderBalance
override def handlePreempt(): Unit = {
callback(Map.empty, partitionsFromAdminClientOpt match {
case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
case None => Map.empty
})
}
override def handleProcess(): Unit = {
if (!isActive) {
callback(Map.empty, partitionsFromAdminClientOpt match {
case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
case None => Map.empty
})
} else {
// We need to register the watcher if the path doesn't exist in order to detect future preferred replica
// leader elections and we get the `path exists` check for free
if (electionType == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
val partitions = partitionsFromAdminClientOpt match {
case Some(partitions) => partitions
case None => zkClient.getPreferredReplicaElection
}
val (validPartitions, invalidPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
invalidPartitions.foreach { p =>
info(s"Skipping preferred replica leader election for partition ${p} since it doesn't exist.")
}
val (partitionsBeingDeleted, livePartitions) = validPartitions.partition(partition =>
topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
if (partitionsBeingDeleted.nonEmpty) {
warn(s"Skipping preferred replica election for partitions $partitionsBeingDeleted " +
s"since the respective topics are being deleted")
}
// partition those where preferred is already leader
val (electablePartitions, alreadyPreferred) = livePartitions.partition { partition =>
val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
val preferredReplica = assignedReplicas.head
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
currentLeader != preferredReplica
}
val electionErrors = onPreferredReplicaElection(electablePartitions, electionType)
val successfulPartitions = electablePartitions -- electionErrors.keySet
val results = electionErrors.map { case (partition, ex) =>
val apiError = if (ex.isInstanceOf[StateChangeFailedException])
new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, ex.getMessage)
else
ApiError.fromThrowable(ex)
partition -> apiError
} ++
alreadyPreferred.map(_ -> ApiError.NONE) ++
partitionsBeingDeleted.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++
invalidPartitions.map ( tp => tp -> new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.")
)
debug(s"PreferredReplicaLeaderElection waiting: $successfulPartitions, results: $results")
callback(successfulPartitions.map(
tp => tp->controllerContext.partitionReplicaAssignment(tp).head).toMap,
results)
}
}
}
}
case object ControllerChange extends ControllerEvent {
override def state = ControllerState.ControllerChange
override def process(): Unit = {
maybeResign()
}
}
case object Reelect extends ControllerEvent {
override def state = ControllerState.ControllerChange
override def process(): Unit = {
maybeResign()
elect()
}
}
case object RegisterBrokerAndReelect extends ControllerEvent {
override def state: ControllerState = ControllerState.ControllerChange
override def process(): Unit = {
_brokerEpoch = zkClient.registerBroker(brokerInfo)
Reelect.process()
}
}
// We can't make this a case object due to the countDownLatch field
class Expire extends ControllerEvent {
private val processingStarted = new CountDownLatch(1)
override def state = ControllerState.ControllerChange
override def process(): Unit = {
processingStarted.countDown()
activeControllerId = -1
onControllerResignation()
}
def waitUntilProcessingStarted(): Unit = {
processingStarted.await()
}
}
}
class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = BrokerIdsZNode.path
override def handleChildChange(): Unit = {
eventManager.put(controller.BrokerChange)
}
}
class BrokerModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
override val path: String = BrokerIdZNode.path(brokerId)
override def handleDataChange(): Unit = {
eventManager.put(controller.BrokerModifications(brokerId))
}
}
class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = TopicsZNode.path
override def handleChildChange(): Unit = eventManager.put(controller.TopicChange)
}
class LogDirEventNotificationHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = LogDirEventNotificationZNode.path
override def handleChildChange(): Unit = eventManager.put(controller.LogDirEventNotification)
}
object LogDirEventNotificationHandler {
val Version: Long = 1L
}
class PartitionModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends ZNodeChangeHandler {
override val path: String = TopicZNode.path(topic)
override def handleDataChange(): Unit = eventManager.put(controller.PartitionModifications(topic))
}
class TopicDeletionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = DeleteTopicsZNode.path
override def handleChildChange(): Unit = eventManager.put(controller.TopicDeletion)
}
class PartitionReassignmentHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = ReassignPartitionsZNode.path
// Note that the event is also enqueued when the znode is deleted, but we do it explicitly instead of relying on
// handleDeletion(). This approach is more robust as it doesn't depend on the watcher being re-registered after
// it's consumed during data changes (we ensure re-registration when the znode is deleted).
override def handleCreation(): Unit = eventManager.put(controller.PartitionReassignment)
}
class PartitionReassignmentIsrChangeHandler(controller: KafkaController, eventManager: ControllerEventManager, partition: TopicPartition) extends ZNodeChangeHandler {
override val path: String = TopicPartitionStateZNode.path(partition)
override def handleDataChange(): Unit = eventManager.put(controller.PartitionReassignmentIsrChange(partition))
}
class IsrChangeNotificationHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = IsrChangeNotificationZNode.path
override def handleChildChange(): Unit = eventManager.put(controller.IsrChangeNotification)
}
object IsrChangeNotificationHandler {
val Version: Long = 1L
}
class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = PreferredReplicaElectionZNode.path
override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection(None))
}
class ControllerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = ControllerZNode.path
override def handleCreation(): Unit = eventManager.put(controller.ControllerChange)
override def handleDeletion(): Unit = eventManager.put(controller.Reelect)
override def handleDataChange(): Unit = eventManager.put(controller.ControllerChange)
}
case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
val reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler) {
def registerReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
def unregisterReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
zkClient.unregisterZNodeChangeHandler(reassignIsrChangeHandler.path)
}
case class PartitionAndReplica(topicPartition: TopicPartition, replica: Int) {
def topic: String = topicPartition.topic
def partition: Int = topicPartition.partition
override def toString: String = {
s"[Topic=$topic,Partition=$partition,Replica=$replica]"
}
}
case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
override def toString: String = {
val leaderAndIsrInfo = new StringBuilder
leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
leaderAndIsrInfo.toString()
}
}
private[controller] class ControllerStats extends KafkaMetricsGroup {
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>
state.rateAndTimeMetricName.map { metricName =>
state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}
}.toMap
}
sealed trait ControllerEvent {
val enqueueTimeMs: Long = Time.SYSTEM.milliseconds()
def state: ControllerState
def process(): Unit
}
/**
* A `ControllerEvent`, such as one with a client callback, which needs specific handling in the event of ZK session expiration.
*/
sealed trait PreemptableControllerEvent extends ControllerEvent {
val spent = new AtomicBoolean(false)
final def preempt(): Unit = {
if (!spent.getAndSet(true))
handlePreempt()
}
final def process(): Unit = {
if (!spent.getAndSet(true))
handleProcess()
}
def handlePreempt(): Unit
def handleProcess(): Unit
}