blob: 95c7a5ac3d4b1b27372124319e15974eb7ea9d4f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.cluster.Partition
import kafka.server.ReplicaAlterLogDirsThread.{PromotionState, ReassignmentState}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch, TopicIdPartition}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.{Map, Set}
class ReplicaAlterLogDirsThread(name: String,
leader: LeaderEndPoint,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats,
fetchBackOffMs: Int,
directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
)
extends AbstractFetcherThread(name = name,
clientId = name,
leader = leader,
failedPartitions,
fetchTierStateMachine = new TierStateMachine(leader, replicaMgr, true),
fetchBackOffMs = fetchBackOffMs,
isInterruptible = false,
brokerTopicStats) {
// Visible for testing
private[server] val promotionStates: ConcurrentHashMap[TopicPartition, PromotionState] = new ConcurrentHashMap()
override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
}
override protected def logStartOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalLogOrException(topicPartition).logStartOffset
}
override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalLogOrException(topicPartition).logEndOffset
}
override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
}
// process fetched data
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
val partition = replicaMgr.getPartitionOrException(topicPartition)
val futureLog = partition.futureLocalLogOrException
val records = toMemoryRecords(FetchResponse.recordsOrFail(partitionData))
if (fetchOffset != futureLog.logEndOffset)
throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, futureLog.logEndOffset))
val logAppendInfo = if (records.sizeInBytes() > 0)
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
else
None
futureLog.updateHighWatermark(partitionData.highWatermark)
futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented)
directoryEventHandler match {
case DirectoryEventHandler.NOOP =>
if (partition.maybeReplaceCurrentWithFutureReplica())
removePartitions(Set(topicPartition))
case _ =>
maybePromoteFutureReplica(topicPartition, partition)
}
quota.record(records.sizeInBytes)
logAppendInfo
}
override def removePartitions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, PartitionFetchState] = {
for (topicPartition <- topicPartitions) {
if (this.promotionStates.containsKey(topicPartition)) {
val PromotionState(reassignmentState, topicId, originalDir) = this.promotionStates.get(topicPartition)
// Revert any reassignments for partitions that did not complete the future replica promotion
if (originalDir.isDefined && topicId.isDefined && reassignmentState.maybeInconsistentMetadata) {
directoryEventHandler.handleAssignment(new TopicIdPartition(topicId.get, topicPartition.partition()), originalDir.get,
"Reverting reassignment for canceled future replica", () => ())
}
this.promotionStates.remove(topicPartition)
}
}
super.removePartitions(topicPartitions)
}
private def reassignmentState(topicPartition: TopicPartition): ReassignmentState = promotionStates.get(topicPartition).reassignmentState
// Visible for testing
private[server] def updateReassignmentState(topicPartition: TopicPartition, state: ReassignmentState): Unit = {
log.debug(s"Updating future replica $topicPartition reassignment state to $state")
promotionStates.put(topicPartition, promotionStates.get(topicPartition).withAssignment(state))
}
private def maybePromoteFutureReplica(topicPartition: TopicPartition, partition: Partition) = {
val topicId = partition.topicId
if (topicId.isEmpty)
throw new IllegalStateException(s"Topic ${topicPartition.topic()} does not have an ID.")
reassignmentState(topicPartition) match {
case ReassignmentState.None =>
// Schedule assignment request and don't promote the future replica yet until the controller has accepted the request.
partition.runCallbackIfFutureReplicaCaughtUp(_ => {
val targetDir = partition.futureReplicaDirectoryId().get
val topicIdPartition = new TopicIdPartition(topicId.get, topicPartition.partition())
directoryEventHandler.handleAssignment(topicIdPartition, targetDir, "Future replica promotion", () => updateReassignmentState(topicPartition, ReassignmentState.Accepted))
updateReassignmentState(topicPartition, ReassignmentState.Queued)
})
case ReassignmentState.Accepted =>
// Promote future replica if controller accepted the request and the replica caught-up with the original log.
if (partition.maybeReplaceCurrentWithFutureReplica()) {
updateReassignmentState(topicPartition, ReassignmentState.Effective)
removePartitions(Set(topicPartition))
}
case ReassignmentState.Queued =>
log.trace("Waiting for AssignReplicasToDirsRequest to succeed before promoting the future replica.")
case ReassignmentState.Effective =>
throw new IllegalStateException("BUG: trying to promote a future replica twice")
}
}
override def addPartitions(initialFetchStates: Map[TopicPartition, InitialFetchState]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
// It is possible that the log dir fetcher completed just before this call, so we
// filter only the partitions which still have a future log dir.
val filteredFetchStates = initialFetchStates.filter { case (tp, _) =>
replicaMgr.futureLogExists(tp)
}
filteredFetchStates.foreach {
case (topicPartition, state) =>
val topicId = state.topicId
val currentDirectoryId = replicaMgr.getPartitionOrException(topicPartition).logDirectoryId()
val promotionState = PromotionState(ReassignmentState.None, topicId, currentDirectoryId)
promotionStates.put(topicPartition, promotionState)
}
super.addPartitions(filteredFetchStates)
} finally {
partitionMapLock.unlock()
}
}
override protected val isOffsetForLeaderEpochSupported: Boolean = true
/**
* Truncate the log for each partition based on current replica's returned epoch and offset.
*
* The logic for finding the truncation offset is the same as in ReplicaFetcherThread
* and mainly implemented in AbstractFetcherThread.getOffsetTruncationState. One difference is
* that the initial fetch offset for topic partition could be set to the truncation offset of
* the current replica if that replica truncates. Otherwise, it is high watermark as in ReplicaFetcherThread.
*
* The reason we have to follow the leader epoch approach for truncating a future replica is to
* cover the case where a future replica is offline when the current replica truncates and
* re-replicates offsets that may have already been copied to the future replica. In that case,
* the future replica may miss "mark for truncation" event and must use the offset for leader epoch
* exchange with the current replica to truncate to the largest common log prefix for the topic partition
*/
override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
val partition = replicaMgr.getPartitionOrException(topicPartition)
partition.truncateTo(truncationState.offset, isFuture = true)
}
override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
val partition = replicaMgr.getPartitionOrException(topicPartition)
partition.truncateFullyAndStartAt(offset, isFuture = true)
}
}
object ReplicaAlterLogDirsThread {
/**
* @param reassignmentState Tracks the state of the replica-to-directory assignment update in the metadata
* @param topicId The ID of the topic, which is useful if a reverting the assignment is required
* @param currentDir The original directory ID from which the future replica fetches from
*/
case class PromotionState(reassignmentState: ReassignmentState, topicId: Option[Uuid], currentDir: Option[Uuid]) {
def withAssignment(newDirReassignmentState: ReassignmentState): PromotionState =
PromotionState(newDirReassignmentState, topicId, currentDir)
}
/**
* Represents the state of the request to update the directory assignment from the current replica directory
* to the future replica directory.
*/
sealed trait ReassignmentState {
/**
* @return true if the directory assignment in the cluster metadata may be inconsistent with the actual
* directory where the main replica is hosted.
*/
def maybeInconsistentMetadata: Boolean = false
}
object ReassignmentState {
/**
* The request has not been created.
*/
case object None extends ReassignmentState
/**
* The request has been queued, it may or may not yet have been sent to the Controller.
*/
case object Queued extends ReassignmentState {
override def maybeInconsistentMetadata: Boolean = true
}
/**
* The controller has acknowledged the new directory assignment and persisted the change in metadata.
*/
case object Accepted extends ReassignmentState {
override def maybeInconsistentMetadata: Boolean = true
}
/**
* The future replica has been promoted and replaced the current replica.
*/
case object Effective extends ReassignmentState
}
}