blob: e41c89910c8798a7a6c1623453bd2ffb449b5ef6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import kafka.cluster.Broker
import org.apache.kafka.common.TopicPartition
import scala.collection.{Seq, Set, mutable}
class ControllerContext {
val stats = new ControllerStats
var offlinePartitionCount = 0
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
private var liveBrokers: Set[Broker] = Set.empty
private var liveBrokerEpochs: Map[Int, Long] = Map.empty
var epoch: Int = KafkaController.InitialControllerEpoch
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
var allTopics: Set[String] = Set.empty
val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, Seq[Int]]]
val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
val partitionsBeingReassigned = mutable.Map.empty[TopicPartition, ReassignedPartitionsContext]
val partitionStates = mutable.Map.empty[TopicPartition, PartitionState]
val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
val topicsToBeDeleted = mutable.Set.empty[String]
/** The following topicsWithDeletionStarted variable is used to properly update the offlinePartitionCount metric.
* When a topic is going through deletion, we don't want to keep track of its partition state
* changes in the offlinePartitionCount metric. This goal means if some partitions of a topic are already
* in OfflinePartition state when deletion starts, we need to change the corresponding partition
* states to NonExistentPartition first before starting the deletion.
*
* However we can NOT change partition states to NonExistentPartition at the time of enqueuing topics
* for deletion. The reason is that when a topic is enqueued for deletion, it may be ineligible for
* deletion due to ongoing partition reassignments. Hence there might be a delay between enqueuing
* a topic for deletion and the actual start of deletion. In this delayed interval, partitions may still
* transition to or out of the OfflinePartition state.
*
* Hence we decide to change partition states to NonExistentPartition only when the actual deletion have started.
* For topics whose deletion have actually started, we keep track of them in the following topicsWithDeletionStarted
* variable. And once a topic is in the topicsWithDeletionStarted set, we are sure there will no longer
* be partition reassignments to any of its partitions, and only then it's safe to move its partitions to
* NonExistentPartition state. Once a topic is in the topicsWithDeletionStarted set, we will stop monitoring
* its partition state changes in the offlinePartitionCount metric
*/
val topicsWithDeletionStarted = mutable.Set.empty[String]
val topicsIneligibleForDeletion = mutable.Set.empty[String]
def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = {
partitionAssignments.getOrElse(topicPartition.topic, mutable.Map.empty)
.getOrElse(topicPartition.partition, Seq.empty)
}
private def clearTopicsState(): Unit = {
allTopics = Set.empty
partitionAssignments.clear()
partitionLeadershipInfo.clear()
partitionsBeingReassigned.clear()
replicasOnOfflineDirs.clear()
partitionStates.clear()
offlinePartitionCount = 0
replicaStates.clear()
}
def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = {
partitionAssignments.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
.put(topicPartition.partition, newReplicas)
}
def partitionReplicaAssignmentForTopic(topic : String): Map[TopicPartition, Seq[Int]] = {
partitionAssignments.getOrElse(topic, Map.empty).map {
case (partition, replicas) => (new TopicPartition(topic, partition), replicas)
}.toMap
}
def allPartitions: Set[TopicPartition] = {
partitionAssignments.flatMap {
case (topic, topicReplicaAssignment) => topicReplicaAssignment.map {
case (partition, _) => new TopicPartition(topic, partition)
}
}.toSet
}
def setLiveBrokerAndEpochs(brokerAndEpochs: Map[Broker, Long]) {
liveBrokers = brokerAndEpochs.keySet
liveBrokerEpochs =
brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)}
}
def addLiveBrokersAndEpochs(brokerAndEpochs: Map[Broker, Long]): Unit = {
liveBrokers = liveBrokers ++ brokerAndEpochs.keySet
liveBrokerEpochs = liveBrokerEpochs ++
(brokerAndEpochs map { case (broker, brokerEpoch) => (broker.id, brokerEpoch)})
}
def removeLiveBrokers(brokerIds: Set[Int]): Unit = {
liveBrokers = liveBrokers.filter(broker => !brokerIds.contains(broker.id))
liveBrokerEpochs = liveBrokerEpochs.filterKeys(id => !brokerIds.contains(id))
}
def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
liveBrokers -= oldMetadata
liveBrokers += newMetadata
}
// getter
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet -- shuttingDownBrokerIds
def liveOrShuttingDownBrokerIds: Set[Int] = liveBrokerEpochs.keySet
def liveOrShuttingDownBrokers: Set[Broker] = liveBrokers
def liveBrokerIdAndEpochs: Map[Int, Long] = liveBrokerEpochs
def liveOrShuttingDownBroker(brokerId: Int): Option[Broker] = liveOrShuttingDownBrokers.find(_.id == brokerId)
def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = {
partitionAssignments.flatMap {
case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {
case (_, replicas) => replicas.contains(brokerId)
}.map {
case (partition, _) => new TopicPartition(topic, partition)
}
}.toSet
}
def isReplicaOnline(brokerId: Int, topicPartition: TopicPartition, includeShuttingDownBrokers: Boolean = false): Boolean = {
val brokerOnline = {
if (includeShuttingDownBrokers) liveOrShuttingDownBrokerIds.contains(brokerId)
else liveBrokerIds.contains(brokerId)
}
brokerOnline && !replicasOnOfflineDirs.getOrElse(brokerId, Set.empty).contains(topicPartition)
}
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
brokerIds.flatMap { brokerId =>
partitionAssignments.flatMap {
case (topic, topicReplicaAssignment) => topicReplicaAssignment.collect {
case (partition, replicas) if replicas.contains(brokerId) =>
PartitionAndReplica(new TopicPartition(topic, partition), brokerId)
}
}
}
}
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
partitionAssignments.getOrElse(topic, mutable.Map.empty).flatMap {
case (partition, replicas) => replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r))
}.toSet
}
def partitionsForTopic(topic: String): collection.Set[TopicPartition] = {
partitionAssignments.getOrElse(topic, mutable.Map.empty).map {
case (partition, _) => new TopicPartition(topic, partition)
}.toSet
}
def allLiveReplicas(): Set[PartitionAndReplica] = {
replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
isReplicaOnline(partitionAndReplica.replica, partitionAndReplica.topicPartition)
}
}
/**
* Get all online and offline replicas.
*
* @return a tuple consisting of first the online replicas and followed by the offline replicas
*/
def onlineAndOfflineReplicas: (Set[PartitionAndReplica], Set[PartitionAndReplica]) = {
val onlineReplicas = mutable.Set.empty[PartitionAndReplica]
val offlineReplicas = mutable.Set.empty[PartitionAndReplica]
for ((topic, partitionReplicas) <- partitionAssignments;
(partitionId, replicas) <- partitionReplicas) {
val partition = new TopicPartition(topic, partitionId)
for (replica <- replicas) {
val partitionAndReplica = PartitionAndReplica(partition, replica)
if (isReplicaOnline(replica, partition))
onlineReplicas.add(partitionAndReplica)
else
offlineReplicas.add(partitionAndReplica)
}
}
(onlineReplicas, offlineReplicas)
}
def replicasForPartition(partitions: collection.Set[TopicPartition]): collection.Set[PartitionAndReplica] = {
partitions.flatMap { p =>
val replicas = partitionReplicaAssignment(p)
replicas.map(PartitionAndReplica(p, _))
}
}
def resetContext(): Unit = {
topicsToBeDeleted.clear()
topicsWithDeletionStarted.clear()
topicsIneligibleForDeletion.clear()
shuttingDownBrokerIds.clear()
epoch = 0
epochZkVersion = 0
clearTopicsState()
setLiveBrokerAndEpochs(Map.empty)
}
def removeTopic(topic: String): Unit = {
allTopics -= topic
partitionAssignments.remove(topic)
partitionLeadershipInfo.foreach {
case (topicPartition, _) if topicPartition.topic == topic => partitionLeadershipInfo.remove(topicPartition)
case _ =>
}
}
def queueTopicDeletion(topics: Set[String]): Unit = {
topicsToBeDeleted ++= topics
}
def beginTopicDeletion(topics: Set[String]): Unit = {
topicsWithDeletionStarted ++= topics
}
def isTopicDeletionInProgress(topic: String): Boolean = {
topicsWithDeletionStarted.contains(topic)
}
def isTopicQueuedUpForDeletion(topic: String): Boolean = {
topicsToBeDeleted.contains(topic)
}
def isTopicEligibleForDeletion(topic: String): Boolean = {
topicsToBeDeleted.contains(topic) && !topicsIneligibleForDeletion.contains(topic)
}
def topicsQueuedForDeletion: Set[String] = {
topicsToBeDeleted
}
def replicasInState(topic: String, state: ReplicaState): Set[PartitionAndReplica] = {
replicasForTopic(topic).filter(replica => replicaStates(replica) == state).toSet
}
def areAllReplicasInState(topic: String, state: ReplicaState): Boolean = {
replicasForTopic(topic).forall(replica => replicaStates(replica) == state)
}
def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
replicasForTopic(topic).exists(replica => replicaStates(replica) == state)
}
def checkValidReplicaStateChange(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): (Seq[PartitionAndReplica], Seq[PartitionAndReplica]) = {
replicas.partition(replica => isValidReplicaStateTransition(replica, targetState))
}
def checkValidPartitionStateChange(partitions: Seq[TopicPartition], targetState: PartitionState): (Seq[TopicPartition], Seq[TopicPartition]) = {
partitions.partition(p => isValidPartitionStateTransition(p, targetState))
}
def putReplicaState(replica: PartitionAndReplica, state: ReplicaState): Unit = {
replicaStates.put(replica, state)
}
def removeReplicaState(replica: PartitionAndReplica): Unit = {
replicaStates.remove(replica)
}
def putReplicaStateIfNotExists(replica: PartitionAndReplica, state: ReplicaState): Unit = {
replicaStates.getOrElseUpdate(replica, state)
}
def putPartitionState(partition: TopicPartition, targetState: PartitionState): Unit = {
val currentState = partitionStates.put(partition, targetState).getOrElse(NonExistentPartition)
updatePartitionStateMetrics(partition, currentState, targetState)
}
private def updatePartitionStateMetrics(partition: TopicPartition,
currentState: PartitionState,
targetState: PartitionState): Unit = {
if (!isTopicDeletionInProgress(partition.topic)) {
if (currentState != OfflinePartition && targetState == OfflinePartition) {
offlinePartitionCount = offlinePartitionCount + 1
} else if (currentState == OfflinePartition && targetState != OfflinePartition) {
offlinePartitionCount = offlinePartitionCount - 1
}
}
}
def putPartitionStateIfNotExists(partition: TopicPartition, state: PartitionState): Unit = {
if (partitionStates.getOrElseUpdate(partition, state) == state)
updatePartitionStateMetrics(partition, NonExistentPartition, state)
}
def replicaState(replica: PartitionAndReplica): ReplicaState = {
replicaStates(replica)
}
def partitionState(partition: TopicPartition): PartitionState = {
partitionStates(partition)
}
def partitionsInState(state: PartitionState): Set[TopicPartition] = {
partitionStates.filter { case (_, s) => s == state }.keySet.toSet
}
def partitionsInStates(states: Set[PartitionState]): Set[TopicPartition] = {
partitionStates.filter { case (_, s) => states.contains(s) }.keySet.toSet
}
def partitionsInState(topic: String, state: PartitionState): Set[TopicPartition] = {
partitionsForTopic(topic).filter { partition => state == partitionState(partition) }.toSet
}
def partitionsInStates(topic: String, states: Set[PartitionState]): Set[TopicPartition] = {
partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
}
private def isValidReplicaStateTransition(replica: PartitionAndReplica, targetState: ReplicaState): Boolean =
targetState.validPreviousStates.contains(replicaStates(replica))
private def isValidPartitionStateTransition(partition: TopicPartition, targetState: PartitionState): Boolean =
targetState.validPreviousStates.contains(partitionStates(partition))
}