blob: 41669bb08d3e8556ac185c9edb2e03ce48ac264e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util
import java.util.Optional
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.{LogAppendInfo, LogOffsetSnapshot}
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint,
brokerConfig: KafkaConfig,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats)
extends AbstractFetcherThread(name = name,
clientId = name,
sourceBroker = sourceBroker,
failedPartitions,
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
isInterruptible = false) {
private val replicaId = brokerConfig.brokerId
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
private var inProgressPartition: Option[TopicPartition] = None
override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.futureLocalReplicaOrException(topicPartition).latestEpoch
}
override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalReplicaOrException(topicPartition).logEndOffset
}
override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
replicaMgr.futureLocalReplicaOrException(topicPartition).endOffsetForEpoch(epoch)
}
def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
val request = fetchRequest.build()
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
partitionData = responsePartitionData.map { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
tp -> new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
data.logStartOffset, abortedTransactions, data.records)
}
}
replicaMgr.fetchMessages(
0L, // timeout is 0 so that the callback will be executed immediately
Request.FutureLocalReplicaId,
request.minBytes,
request.maxBytes,
false,
request.fetchData.asScala.toSeq,
UnboundedQuota,
processResponseCallback,
request.isolationLevel)
if (partitionData == null)
throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
partitionData
}
// process fetched data
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
val futureReplica = replicaMgr.futureLocalReplicaOrException(topicPartition)
val partition = replicaMgr.getPartition(topicPartition).get
val records = toMemoryRecords(partitionData.records)
if (fetchOffset != futureReplica.logEndOffset)
throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, futureReplica.logEndOffset))
val logAppendInfo = if (records.sizeInBytes() > 0)
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
else
None
val futureReplicaHighWatermark = futureReplica.logEndOffset.min(partitionData.highWatermark)
futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
if (partition.maybeReplaceCurrentWithFutureReplica())
removePartitions(Set(topicPartition))
quota.record(records.sizeInBytes)
logAppendInfo
}
override def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
partitionMapLock.lockInterruptibly()
try {
// It is possible that the log dir fetcher completed just before this call, so we
// filter only the partitions which still have a future log dir.
val filteredFetchStates = initialFetchStates.filter { case (tp, _) =>
replicaMgr.futureLogExists(tp)
}
super.addPartitions(filteredFetchStates)
} finally {
partitionMapLock.unlock()
}
}
override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
offsetSnapshot.logStartOffset
}
override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
offsetSnapshot.logEndOffset.messageOffset
}
private def offsetSnapshotFromCurrentReplica(topicPartition: TopicPartition, leaderEpoch: Int): LogOffsetSnapshot = {
val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
partition.fetchOffsetSnapshot(Optional.of[Integer](leaderEpoch), fetchOnlyFromLeader = false)
}
/**
* Fetches offset for leader epoch from local replica for each given topic partitions
* @param partitions map of topic partition -> leader epoch of the future replica
* @return map of topic partition -> end offset for a requested leader epoch
*/
override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
partitions.map { case (tp, epochData) =>
try {
val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) {
new EpochEndOffset(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} else {
val partition = replicaMgr.getPartitionOrException(tp, expectLeader = false)
partition.lastOffsetForLeaderEpoch(
currentLeaderEpoch = epochData.currentLeaderEpoch,
leaderEpoch = epochData.leaderEpoch,
fetchOnlyFromLeader = false)
}
tp -> endOffset
} catch {
case t: Throwable =>
warn(s"Error when getting EpochEndOffset for $tp", t)
tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
}
}
}
override protected def isOffsetForLeaderEpochSupported: Boolean = true
/**
* Truncate the log for each partition based on current replica's returned epoch and offset.
*
* The logic for finding the truncation offset is the same as in ReplicaFetcherThread
* and mainly implemented in AbstractFetcherThread.getOffsetTruncationState. One difference is
* that the initial fetch offset for topic partition could be set to the truncation offset of
* the current replica if that replica truncates. Otherwise, it is high watermark as in ReplicaFetcherThread.
*
* The reason we have to follow the leader epoch approach for truncating a future replica is to
* cover the case where a future replica is offline when the current replica truncates and
* re-replicates offsets that may have already been copied to the future replica. In that case,
* the future replica may miss "mark for truncation" event and must use the offset for leader epoch
* exchange with the current replica to truncate to the largest common log prefix for the topic partition
*/
override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
partition.truncateTo(truncationState.offset, isFuture = true)
}
override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
partition.truncateFullyAndStartAt(offset, isFuture = true)
}
private def nextReadyPartition(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
partitionMap.filter { case (_, partitionFetchState) =>
partitionFetchState.isReadyForFetch
}.reduceLeftOption { (left, right) =>
if ((left._1.topic < right._1.topic) || (left._1.topic == right._1.topic && left._1.partition < right._1.partition))
left
else
right
}
}
private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
// Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on
// moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the
// partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it
// becomes unavailable or is removed.
inProgressPartition.foreach { tp =>
val fetchStateOpt = partitionMap.get(tp)
fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
return Some((tp, fetchState))
}
}
inProgressPartition = None
val nextPartitionOpt = nextReadyPartition(partitionMap)
nextPartitionOpt.foreach { case (tp, fetchState) =>
inProgressPartition = Some(tp)
info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " +
s"Including this partition, there are ${partitionMap.size} remaining partitions to copy by this thread.")
}
nextPartitionOpt
}
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()
try {
val logStartOffset = replicaMgr.futureLocalReplicaOrException(tp).logStartOffset
requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset,
fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
} catch {
case e: KafkaStorageException =>
debug(s"Failed to build fetch for $tp", e)
partitionsWithError += tp
}
val fetchRequestOpt = if (requestMap.isEmpty) {
None
} else {
// Set maxWait and minBytes to 0 because the response should return immediately if
// the future log has caught up with the current log of the partition
Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
.setMaxBytes(maxBytes))
}
ResultWithPartitions(fetchRequestOpt, partitionsWithError)
}
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
ResultWithPartitions(None, Set.empty)
} else {
selectPartitionToFetch(partitionMap) match {
case Some((tp, fetchState)) =>
buildFetchForPartition(tp, fetchState)
case None =>
ResultWithPartitions(None, Set.empty)
}
}
}
}