* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.server
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import kafka.api.LeaderAndIsr
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.OperationNotAttemptedException
import org.apache.kafka.common.message.AlterPartitionRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.Scheduler
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
* Handles updating the ISR by sending AlterPartition requests to the controller (as of 2.7) or by updating ZK directly
* (prior to 2.7). Updating the ISR is an asynchronous operation, so partitions will learn about the result of their
* request through a callback.
* Note that ISR state changes can still be initiated by the controller and sent to the partitions via LeaderAndIsr
* requests.
trait AlterPartitionManager {
def start(): Unit = {}
def shutdown(): Unit = {}
def submit(
topicIdPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
controllerEpoch: Int
): CompletableFuture[LeaderAndIsr]
case class AlterPartitionItem(
topicIdPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
future: CompletableFuture[LeaderAndIsr],
controllerEpoch: Int // controllerEpoch needed for `ZkAlterPartitionManager`
object AlterPartitionManager {
* Factory to AlterPartition based implementation, used when IBP >= 2.7-IV2
def apply(
config: KafkaConfig,
metadataCache: MetadataCache,
scheduler: Scheduler,
controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
threadNamePrefix: String,
brokerEpochSupplier: () => Long,
): AlterPartitionManager = {
val channelManager = new NodeToControllerChannelManagerImpl(
time = time,
metrics = metrics,
config = config,
channelName = "alter-partition",
threadNamePrefix = threadNamePrefix,
retryTimeoutMs = Long.MaxValue
new DefaultAlterPartitionManager(
controllerChannelManager = channelManager,
scheduler = scheduler,
time = time,
brokerId = config.brokerId,
brokerEpochSupplier = brokerEpochSupplier,
metadataVersionSupplier = () => metadataCache.metadataVersion()
* Factory for ZK based implementation, used when IBP < 2.7-IV2
def apply(
scheduler: Scheduler,
time: Time,
zkClient: KafkaZkClient
): AlterPartitionManager = {
new ZkAlterPartitionManager(scheduler, time, zkClient)
class DefaultAlterPartitionManager(
val controllerChannelManager: NodeToControllerChannelManager,
val scheduler: Scheduler,
val time: Time,
val brokerId: Int,
val brokerEpochSupplier: () => Long,
val metadataVersionSupplier: () => MetadataVersion
) extends AlterPartitionManager with Logging {
// Used to allow only one pending ISR update per partition (visible for testing).
// Note that we key items by TopicPartition despite using TopicIdPartition while
// submitting changes. We do this to ensure that topics with the same name but
// with a different topic id or no topic id collide here. There are two cases to
// consider:
// 1) When the cluster is upgraded from IBP < 2.8 to IBP >= 2.8, the ZK controller
// assigns topic ids to the partitions. So partitions will start sending updates
// with a topic id while they might still have updates without topic ids in this
// Map. This would break the contract of only allowing one pending ISR update per
// partition.
// 2) When a topic is deleted and re-created, we cannot have two entries in this Map
// especially if we cannot use an AlterPartition request version which supports
// topic ids in the end because the two updates with the same name would be merged
// together.
private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, AlterPartitionItem]()
// Used to allow only one in-flight request at a time
private val inflightRequest: AtomicBoolean = new AtomicBoolean(false)
override def start(): Unit = {
override def shutdown(): Unit = {
override def submit(
topicIdPartition: TopicIdPartition,
leaderAndIsr: LeaderAndIsr,
controllerEpoch: Int
): CompletableFuture[LeaderAndIsr] = {
val future = new CompletableFuture[LeaderAndIsr]()
val alterPartitionItem = AlterPartitionItem(topicIdPartition, leaderAndIsr, future, controllerEpoch)
val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicIdPartition.topicPartition, alterPartitionItem) == null
if (enqueued) {
} else {
future.completeExceptionally(new OperationNotAttemptedException(
s"Failed to enqueue ISR change state $leaderAndIsr for partition $topicIdPartition"))
private[server] def maybePropagateIsrChanges(): Unit = {
// Send all pending items if there is not already a request in-flight.
if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
// Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler
val inflightAlterPartitionItems = new ListBuffer[AlterPartitionItem]()
unsentIsrUpdates.values.forEach(item => inflightAlterPartitionItems.append(item))
private[server] def clearInFlightRequest(): Unit = {
if (!inflightRequest.compareAndSet(true, false)) {
warn("Attempting to clear AlterPartition in-flight flag when no apparent request is in-flight")
private def sendRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): Unit = {
val brokerEpoch = brokerEpochSupplier()
val (request, topicNamesByIds) = buildRequest(inflightAlterPartitionItems, brokerEpoch)
debug(s"Sending AlterPartition to controller $request")
// We will not timeout AlterPartition request, instead letting it retry indefinitely
// until a response is received, or a new LeaderAndIsr overwrites the existing isrState
// which causes the response for those partitions to be ignored.
new ControllerRequestCompletionHandler {
override def onComplete(response: ClientResponse): Unit = {
debug(s"Received AlterPartition response $response")
val error = try {
if (response.authenticationException != null) {
// For now we treat authentication errors as retriable. We use the
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
// Note that `NodeToControllerChannelManager` will still log the
// authentication errors so that users have a chance to fix the problem.
} else if (response.versionMismatch != null) {
} else {
} finally {
// clear the flag so future requests can proceed
// check if we need to send another request right away
error match {
case Errors.NONE =>
// In the normal case, check for pending updates to send immediately
case _ =>
// If we received a top-level error from the controller, retry the request in the near future
scheduler.scheduleOnce("send-alter-partition", () => maybePropagateIsrChanges(), 50)
override def onTimeout(): Unit = {
throw new IllegalStateException("Encountered unexpected timeout when sending AlterPartition to the controller")
* Builds an AlterPartition request.
* While building the request, we don't know which version of the AlterPartition API is
* supported by the controller. The final decision is taken when the AlterPartitionRequest
* is built in the network client based on the advertised api versions of the controller.
* We could use version 2 or above if all the pending changes have an topic id defined;
* otherwise we must use version 1 or below.
* @return A tuple containing the AlterPartitionRequest.Builder and a mapping from
* topic id to topic name. This mapping is used in the response handling.
private def buildRequest(
inflightAlterPartitionItems: Seq[AlterPartitionItem],
brokerEpoch: Long
): (AlterPartitionRequest.Builder, mutable.Map[Uuid, String]) = {
val metadataVersion = metadataVersionSupplier()
// We build this mapping in order to map topic id back to their name when we
// receive the response. We cannot rely on the metadata cache for this because
// the metadata cache is updated after the partition state so it might not know
// yet about a topic id already used here.
val topicNamesByIds = mutable.HashMap[Uuid, String]()
// We can use topic ids only if all the pending changed have one defined and
// we use IBP 2.8 or above.
var canUseTopicIds = metadataVersion.isTopicIdsSupported
val message = new AlterPartitionRequestData()
inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) =>
val topicId = items.head.topicIdPartition.topicId
canUseTopicIds &= topicId != Uuid.ZERO_UUID
topicNamesByIds(topicId) = topicName
// Both the topic name and the topic id are set here because at this stage
// we don't know which version of the request will be used.
val topicData = new AlterPartitionRequestData.TopicData()
items.foreach { item =>
val partitionData = new AlterPartitionRequestData.PartitionData()
if (metadataVersion.isLeaderRecoverySupported) {
// If we cannot use topic ids, the builder will ensure that no version higher than 1 is used.
(new AlterPartitionRequest.Builder(message, canUseTopicIds), topicNamesByIds)
private def handleAlterPartitionResponse(
requestHeader: RequestHeader,
alterPartitionResp: AlterPartitionResponse,
sentBrokerEpoch: Long,
inflightAlterPartitionItems: Seq[AlterPartitionItem],
topicNamesByIds: mutable.Map[Uuid, String]
): Errors = {
val data =
Errors.forCode(data.errorCode) match {
warn(s"Broker had a stale broker epoch ($sentBrokerEpoch), retrying.")
error(s"Broker is not authorized to send AlterPartition to controller",
Errors.CLUSTER_AUTHORIZATION_FAILED.exception("Broker is not authorized to send AlterPartition to controller"))
case Errors.NONE =>
// Collect partition-level responses to pass to the callbacks
val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
data.topics.forEach { topic =>
// Topic IDs are used since version 2 of the AlterPartition API.
val topicName = if (requestHeader.apiVersion > 1) topicNamesByIds.get(topic.topicId).orNull else topic.topicName
if (topicName == null || topicName.isEmpty) {
error(s"Received an unexpected topic $topic in the alter partition response, ignoring it.")
} else {
topic.partitions.forEach { partition =>
val tp = new TopicPartition(topicName, partition.partitionIndex)
val apiError = Errors.forCode(partition.errorCode)
debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
if (apiError == Errors.NONE) {
LeaderRecoveryState.optionalOf(partition.leaderRecoveryState).asScala match {
case Some(leaderRecoveryState) =>
partitionResponses(tp) = Right(
case None =>
error(s"Controller returned an invalid leader recovery state (${partition.leaderRecoveryState}) for $tp: $partition")
partitionResponses(tp) = Left(Errors.UNKNOWN_SERVER_ERROR)
} else {
partitionResponses(tp) = Left(apiError)
// Iterate across the items we sent rather than what we received to ensure we run the callback even if a
// partition was somehow erroneously excluded from the response. Note that these callbacks are run from
// the leaderIsrUpdateLock write lock in Partition#sendAlterPartitionRequest
inflightAlterPartitionItems.foreach { inflightAlterPartition =>
partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) match {
case Some(leaderAndIsrOrError) =>
// Regardless of callback outcome, we need to clear from the unsent updates map to unblock further
// updates. We clear it now to allow the callback to submit a new update if needed.
leaderAndIsrOrError match {
case Left(error) => inflightAlterPartition.future.completeExceptionally(error.exception)
case Right(leaderAndIsr) => inflightAlterPartition.future.complete(leaderAndIsr)
case None =>
// Don't remove this partition from the update map so it will get re-sent
warn(s"Partition ${inflightAlterPartition.topicIdPartition} was sent but not included in the response")
case e =>
warn(s"Controller returned an unexpected top-level error when handling AlterPartition request: $e")