blob: 84b2d480d9c50844a25fd86795b927ac96ba6ecd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.io.File
import java.util.Optional
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
import scala.collection._
/*
* Result metadata of a log append operation on the log
*/
case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) {
def error: Errors = exception match {
case None => Errors.NONE
case Some(e) => Errors.forException(e)
}
}
case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exception: Option[Throwable] = None) {
def error: Errors = exception match {
case None => Errors.NONE
case Some(e) => Errors.forException(e)
}
}
/*
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
* @param hw high watermark of the local replica
* @param readSize amount of data that was read from the log i.e. size of the fetch
* @param isReadFromLogEnd true if the request read up to the log end offset snapshot
* when the read was initiated, false otherwise
* @param error Exception if error encountered while reading from the log
*/
case class LogReadResult(info: FetchDataInfo,
highWatermark: Long,
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
followerLogStartOffset: Long,
fetchTimeMs: Long,
readSize: Int,
lastStableOffset: Option[Long],
exception: Option[Throwable] = None) {
def error: Errors = exception match {
case None => Errors.NONE
case Some(e) => Errors.forException(e)
}
def withEmptyFetchInfo: LogReadResult =
copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
override def toString =
s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]"
}
case class FetchPartitionData(error: Errors = Errors.NONE,
highWatermark: Long,
logStartOffset: Long,
records: Records,
lastStableOffset: Option[Long],
abortedTransactions: Option[List[AbortedTransaction]])
object LogReadResult {
val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = -1,
lastStableOffset = None)
}
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
val IsrChangePropagationBlackOut = 5000L
val IsrChangePropagationInterval = 60000L
val OfflinePartition: Partition = new Partition(new TopicPartition("", -1),
isOffline = true,
replicaLagTimeMaxMs = 0L,
localBrokerId = -1,
time = null,
replicaManager = null,
logManager = null,
zkClient = null)
}
class ReplicaManager(val config: KafkaConfig,
metrics: Metrics,
time: Time,
val zkClient: KafkaZkClient,
scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
quotaManagers: QuotaManagers,
val brokerTopicStats: BrokerTopicStats,
val metadataCache: MetadataCache,
logDirFailureChannel: LogDirFailureChannel,
val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
def this(config: KafkaConfig,
metrics: Metrics,
time: Time,
zkClient: KafkaZkClient,
scheduler: Scheduler,
logManager: LogManager,
isShuttingDown: AtomicBoolean,
quotaManagers: QuotaManagers,
brokerTopicStats: BrokerTopicStats,
metadataCache: MetadataCache,
logDirFailureChannel: LogDirFailureChannel,
threadNamePrefix: Option[String] = None) {
this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown,
quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", brokerId = config.brokerId,
purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", brokerId = config.brokerId,
purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
DelayedOperationPurgatory[DelayedDeleteRecords](
purgatoryName = "DeleteRecords", brokerId = config.brokerId,
purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
threadNamePrefix)
}
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch
private val localBrokerId = config.brokerId
private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
Partition(tp, time, this)))
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
@volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir =>
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
private var hwThreadInitialized = false
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
private var logDirFailureHandler: LogDirFailureHandler = null
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
override def doWork() {
val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
if (haltBrokerOnDirFailure) {
fatal(s"Halting broker because dir $newOfflineLogDir is offline")
Exit.halt(1)
}
handleLogDirFailure(newOfflineLogDir)
}
}
val leaderCount = newGauge(
"LeaderCount",
new Gauge[Int] {
def value = leaderPartitionsIterator.size
}
)
val partitionCount = newGauge(
"PartitionCount",
new Gauge[Int] {
def value = allPartitions.size
}
)
val offlineReplicaCount = newGauge(
"OfflineReplicaCount",
new Gauge[Int] {
def value = offlinePartitionsIterator.size
}
)
val underReplicatedPartitions = newGauge(
"UnderReplicatedPartitions",
new Gauge[Int] {
def value = underReplicatedPartitionCount
}
)
val underMinIsrPartitionCount = newGauge(
"UnderMinIsrPartitionCount",
new Gauge[Int] {
def value = leaderPartitionsIterator.count(_.isUnderMinIsr)
}
)
val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}
def recordIsrChange(topicPartition: TopicPartition) {
isrChangeSet synchronized {
isrChangeSet += topicPartition
lastIsrChangeMs.set(System.currentTimeMillis())
}
}
/**
* This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
* 1. There is ISR change not propagated yet.
* 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
* This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
* other brokers when large amount of ISR change occurs.
*/
def maybePropagateIsrChanges() {
val now = System.currentTimeMillis()
isrChangeSet synchronized {
if (isrChangeSet.nonEmpty &&
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
zkClient.propagateIsrChanges(isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
}
}
// When ReplicaAlterDirThread finishes replacing a current replica with a future replica, it will
// remove the partition from the partition state map. But it will not close itself even if the
// partition state map is empty. Thus we need to call shutdownIdleReplicaAlterDirThread() periodically
// to shutdown idle ReplicaAlterDirThread
def shutdownIdleReplicaAlterLogDirsThread(): Unit = {
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
}
def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition)
/**
* Try to complete some delayed produce requests with the request key;
* this can be triggered when:
*
* 1. The partition HW has changed (for acks = -1)
* 2. A follower replica's fetch operation is received (for acks > 1)
*/
def tryCompleteDelayedProduce(key: DelayedOperationKey) {
val completed = delayedProducePurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed))
}
/**
* Try to complete some delayed fetch requests with the request key;
* this can be triggered when:
*
* 1. The partition HW has changed (for regular fetch)
* 2. A new message set is appended to the local log (for follower fetch)
*/
def tryCompleteDelayedFetch(key: DelayedOperationKey) {
val completed = delayedFetchPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
}
/**
* Try to complete some delayed DeleteRecordsRequest with the request key;
* this needs to be triggered when the partition low watermark has changed
*/
def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) {
val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
}
def startup() {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = TimeUnit.MILLISECONDS)
// If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
// In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.
// Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0
val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()
}
def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean) = {
stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition")
if (deletePartition) {
val removedPartition = allPartitions.remove(topicPartition)
if (removedPartition eq ReplicaManager.OfflinePartition) {
allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk")
}
if (removedPartition != null) {
val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic)
if (!topicHasPartitions)
brokerTopicStats.removeMetrics(topicPartition.topic)
// this will delete the local log. This call may throw exception if the log is on offline directory
removedPartition.delete()
} else {
stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
}
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
if (logManager.getLog(topicPartition).isDefined)
logManager.asyncDelete(topicPartition)
if (logManager.getLog(topicPartition, isFuture = true).isDefined)
logManager.asyncDelete(topicPartition, isFuture = true)
}
stateChangeLogger.trace(s"Finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
}
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = {
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
if(stopReplicaRequest.controllerEpoch() < controllerEpoch) {
stateChangeLogger.warn("Received stop replica request from an old controller epoch " +
s"${stopReplicaRequest.controllerEpoch}. Latest known controller epoch is $controllerEpoch")
(responseMap, Errors.STALE_CONTROLLER_EPOCH)
} else {
val partitions = stopReplicaRequest.partitions.asScala
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager.removeFetcherForPartitions(partitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
for (topicPartition <- partitions){
try {
stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
responseMap.put(topicPartition, Errors.NONE)
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for " +
s"partition $topicPartition due to storage exception", e)
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
(responseMap, Errors.NONE)
}
}
}
def getOrCreatePartition(topicPartition: TopicPartition): Partition =
allPartitions.getAndMaybePut(topicPartition)
def getPartition(topicPartition: TopicPartition): Option[Partition] =
Option(allPartitions.get(topicPartition))
def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition)
private def nonOfflinePartitionsIterator: Iterator[Partition] =
allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)
private def offlinePartitionsIterator: Iterator[Partition] =
allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition)
def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = {
getPartition(topicPartition) match {
case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition)
throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
else
partition
case None if metadataCache.contains(topicPartition) =>
if (expectLeader) {
// The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which
// forces clients to refresh metadata to find the new location. This can happen, for example,
// during a partition reassignment if a produce request from the client is sent to a broker after
// the local replica has been deleted.
throw new NotLeaderForPartitionException(s"Broker $localBrokerId is not a replica of $topicPartition")
} else {
throw new ReplicaNotAvailableException(s"Partition $topicPartition is not available")
}
case None =>
throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist")
}
}
def localReplicaOrException(topicPartition: TopicPartition): Replica = {
getPartitionOrException(topicPartition, expectLeader = false).localReplicaOrException
}
def futureLocalReplicaOrException(topicPartition: TopicPartition): Replica = {
getPartitionOrException(topicPartition, expectLeader = false).futureLocalReplicaOrException
}
def futureLocalReplica(topicPartition: TopicPartition): Option[Replica] = {
nonOfflinePartition(topicPartition).flatMap(_.futureLocalReplica)
}
def localReplica(topicPartition: TopicPartition): Option[Replica] = {
nonOfflinePartition(topicPartition).flatMap(_.localReplica)
}
def getLogDir(topicPartition: TopicPartition): Option[String] = {
localReplica(topicPartition).flatMap(_.log).map(_.dir.getParent)
}
/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
*/
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) {
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
isFromClient = isFromClient, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status
}
recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats))
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
}
responseCallback(responseStatus)
}
}
/**
* Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
* the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
*/
private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
offsetPerPartition.map { case (topicPartition, requestedOffset) =>
// reject delete records operation on internal topics
if (Topic.isInternal(topicPartition.topic)) {
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
val partition = getPartitionOrException(topicPartition, expectLeader = true)
val logDeleteResult = partition.deleteRecordsOnLeader(requestedOffset)
(topicPartition, logDeleteResult)
} catch {
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: OffsetOutOfRangeException |
_: PolicyViolationException |
_: KafkaStorageException) =>
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e)))
case t: Throwable =>
error("Error processing delete records operation on partition %s".format(topicPartition), t)
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(t)))
}
}
}
}
// If there exists a topic partition that meets the following requirement,
// we need to put a delayed DeleteRecordsRequest and wait for the delete records operation to complete
//
// 1. the delete records operation on this partition is successful
// 2. low watermark of this partition is smaller than the specified offset
private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
localDeleteRecordsResults.exists{ case (_, deleteRecordsResult) =>
deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
}
}
/**
* For each pair of partition and log directory specified in the map, if the partition has already been created on
* this broker, move its log files to the specified log directory. Otherwise, record the pair in the memory so that
* the partition will be created in the specified log directory when broker receives LeaderAndIsrRequest for the partition later.
*/
def alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = {
replicaStateChangeLock synchronized {
partitionDirs.map { case (topicPartition, destinationDir) =>
try {
if (!logManager.isLogDirOnline(destinationDir))
throw new KafkaStorageException(s"Log directory $destinationDir is offline")
getPartition(topicPartition).foreach { partition =>
if (partition eq ReplicaManager.OfflinePartition)
throw new KafkaStorageException(s"Partition $topicPartition is offline")
// Stop current replica movement if the destinationDir is different from the existing destination log directory
if (partition.futureReplicaDirChanged(destinationDir)) {
replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition))
partition.removeFutureLocalReplica()
}
}
// If the log for this partition has not been created yet:
// 1) Record the destination log directory in the memory so that the partition will be created in this log directory
// when broker receives LeaderAndIsrRequest for this partition later.
// 2) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaLogDirsResponse
logManager.maybeUpdatePreferredLogDir(topicPartition, destinationDir)
// throw ReplicaNotAvailableException if replica does not exit for the given partition
val partition = getPartitionOrException(topicPartition, expectLeader = false)
partition.localReplicaOrException
// If the destinationLDir is different from the current log directory of the replica:
// - If there is no offline log directory, create the future log in the destinationDir (if it does not exist) and
// start ReplicaAlterDirThread to move data of this partition from the current log to the future log
// - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory
// so that we can avoid creating future log for the same partition in multiple log directories.
if (partition.maybeCreateFutureReplica(destinationDir)) {
val futureReplica = futureLocalReplicaOrException(topicPartition)
logManager.abortAndPauseCleaning(topicPartition)
val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
partition.getLeaderEpoch, futureReplica.highWatermark.messageOffset)
replicaAlterLogDirsManager.addFetcherForPartitions(Map(topicPartition -> initialFetchState))
}
(topicPartition, Errors.NONE)
} catch {
case e@(_: LogDirNotFoundException |
_: ReplicaNotAvailableException |
_: KafkaStorageException) =>
(topicPartition, Errors.forException(e))
case t: Throwable =>
error("Error while changing replica dir for partition %s".format(topicPartition), t)
(topicPartition, Errors.forException(t))
}
}
}
}
/*
* Get the LogDirInfo for the specified list of partitions.
*
* Each LogDirInfo specifies the following information for a given log directory:
* 1) Error of the log directory, e.g. whether the log is online or offline
* 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions
* are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
*/
def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = {
val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent)
config.logDirs.toSet.map { logDir: String =>
val absolutePath = new File(logDir).getAbsolutePath
try {
if (!logManager.isLogDirOnline(absolutePath))
throw new KafkaStorageException(s"Log directory $absolutePath is offline")
logsByDir.get(absolutePath) match {
case Some(logs) =>
val replicaInfos = logs.filter { log =>
partitions.contains(log.topicPartition)
}.map { log =>
log.topicPartition -> new ReplicaInfo(log.size, getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture), log.isFuture)
}.toMap
(absolutePath, new LogDirInfo(Errors.NONE, replicaInfos.asJava))
case None =>
(absolutePath, new LogDirInfo(Errors.NONE, Map.empty[TopicPartition, ReplicaInfo].asJava))
}
} catch {
case _: KafkaStorageException =>
(absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava))
case t: Throwable =>
error(s"Error while describing replica in dir $absolutePath", t)
(absolutePath, new LogDirInfo(Errors.forException(t), Map.empty[TopicPartition, ReplicaInfo].asJava))
}
}.toMap
}
def getLogEndOffsetLag(topicPartition: TopicPartition, logEndOffset: Long, isFuture: Boolean): Long = {
localReplica(topicPartition) match {
case Some(replica) =>
if (isFuture)
replica.logEndOffset.messageOffset - logEndOffset
else
math.max(replica.highWatermark.messageOffset - logEndOffset, 0)
case None =>
// return -1L to indicate that the LEO lag is not available if the replica is not created or is offline
DescribeLogDirsResponse.INVALID_OFFSET_LAG
}
}
def deleteRecords(timeout: Long,
offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) {
val timeBeforeLocalDeleteRecords = time.milliseconds
val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))
val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
topicPartition ->
DeleteRecordsPartitionStatus(
result.requestedOffset, // requested offset
new DeleteRecordsResponse.PartitionResponse(result.lowWatermark, result.error)) // response status
}
if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
// create delayed delete records operation
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed delete records operation is being created, new
// requests may arrive and hence make this operation completable.
delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
} else {
// we can respond immediately
val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
responseCallback(deleteRecordsResponseStatus)
}
}
// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
private def delayedProduceRequestRequired(requiredAcks: Short,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
entriesPerPartition.nonEmpty &&
localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}
private def isValidRequiredAcks(requiredAcks: Short): Boolean = {
requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0
}
/**
* Append the messages to the local replica logs
*/
private def appendToLocalLog(internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
trace(s"Append [$entriesPerPartition] to local log")
entriesPerPartition.map { case (topicPartition, records) =>
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
// reject appending to internal topics if it is not allowed
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else {
try {
val partition = getPartitionOrException(topicPartition, expectLeader = true)
val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
val numAppendedMessages = info.numMessages
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
(topicPartition, LogAppendResult(info))
} catch {
// NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
_: KafkaStorageException |
_: InvalidTimestampException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
case t: Throwable =>
val logStartOffset = getPartition(topicPartition) match {
case Some(partition) =>
partition.logStartOffset
case _ =>
-1
}
brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
error(s"Error processing append operation on partition $topicPartition", t)
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
}
}
}
}
def fetchOffsetForTimestamp(topicPartition: TopicPartition,
timestamp: Long,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): TimestampOffset = {
val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader)
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader)
}
def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition,
timestamp: Long,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = {
val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader)
partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
}
/**
* Fetch messages from the leader replica, and wait until enough data can be fetched and return;
* the callback function will be triggered either when timeout or required fetch info is satisfied
*/
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota = UnboundedQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel) {
val isFromFollower = Request.isValidBrokerId(replicaId)
val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId
val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)
FetchLogEnd
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
FetchTxnCommitted
else
FetchHighWatermark
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
val result = readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader,
fetchIsolation = fetchIsolation,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)
if (isFromFollower) updateFollowerLogReadResults(replicaId, result)
else result
}
val logReadResults = readFromLog()
// check if this fetch request can be satisfied right away
val logReadResultValues = logReadResults.map { case (_, v) => v }
val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) =>
errorIncurred || (readResult.error != Errors.NONE))
// respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions)
}
responseCallback(fetchPartitionData)
} else {
// construct the fetch results from the read results
val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
val fetchInfo = fetchInfos.collectFirst {
case (tp, v) if tp == topicPartition => v
}.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
(topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
// try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed fetch operation is being created, new requests
// may arrive and hence make this operation completable.
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}
/**
* Read from multiple topic partitions at the given offset up to maxSize bytes
*/
def readFromLocalLog(replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset
val partitionFetchSize = fetchInfo.maxBytes
val followerLogStartOffset = fetchInfo.logStartOffset
brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
try {
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
val fetchTimeMs = time.milliseconds
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val readInfo = partition.readRecords(
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {
// If the partition is being throttled, simply return an empty set.
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else {
readInfo.fetchedData
}
LogReadResult(info = fetchDataInfo,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
readSize = adjustedMaxBytes,
lastStableOffset = Some(readInfo.lastStableOffset),
exception = None)
} catch {
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it
// is supposed to indicate un-expected failure of a broker in handling a fetch request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderForPartitionException |
_: UnknownLeaderEpochException |
_: FencedLeaderEpochException |
_: ReplicaNotAvailableException |
_: KafkaStorageException |
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = 0,
lastStableOffset = None,
exception = Some(e))
case e: Throwable =>
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
val fetchSource = Request.describeReplicaId(replicaId)
error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
s"on partition $tp: $fetchInfo", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = 0,
lastStableOffset = None,
exception = Some(e))
}
}
var limitBytes = fetchMaxBytes
val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
var minOneMessage = !hardMaxBytesLimit
readPartitionInfo.foreach { case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
val recordBatchSize = readResult.info.records.sizeInBytes
// Once we read from a non-empty partition, we stop ignoring request and partition level size limits
if (recordBatchSize > 0)
minOneMessage = false
limitBytes = math.max(0, limitBytes - recordBatchSize)
result += (tp -> readResult)
}
result
}
/**
* To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = {
val isReplicaInSync = nonOfflinePartition(topicPartition).exists { partition =>
partition.getReplica(replicaId).exists(partition.inSyncReplicas.contains)
}
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
}
def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localReplica(topicPartition).flatMap(_.log.map(_.config))
def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value)
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
val stateControllerEpochErrorMessage = s"Received update metadata request with correlation id $correlationId " +
s"from an old controller ${updateMetadataRequest.controllerId} with epoch ${updateMetadataRequest.controllerEpoch}. " +
s"Latest known controller epoch is $controllerEpoch"
stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))
} else {
val deletedPartitions = metadataCache.updateMetadata(correlationId, updateMetadataRequest)
controllerEpoch = updateMetadataRequest.controllerEpoch
deletedPartitions
}
}
}
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " +
s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " +
s"epoch ${leaderAndIsrRequest.controllerEpoch} for partition $topicPartition")
}
replicaStateChangeLock synchronized {
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " +
s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
s"Latest known controller epoch is $controllerEpoch")
leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
val responseMap = new mutable.HashMap[TopicPartition, Errors]
val controllerId = leaderAndIsrRequest.controllerId
controllerEpoch = leaderAndIsrRequest.controllerEpoch
// First check partition's leader epoch
val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
val newPartitions = new mutable.HashSet[Partition]
leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
val partition = getPartition(topicPartition).getOrElse {
val createdPartition = getOrCreatePartition(topicPartition)
newPartitions.add(createdPartition)
createdPartition
}
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch
if (partition eq ReplicaManager.OfflinePartition) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
} else if (requestLeaderEpoch > currentLeaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
partitionState.put(partition, stateInfo)
else {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}")
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
} else {
// Otherwise record the error code in response
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
s"controller $controllerId with correlation id $correlationId " +
s"epoch $controllerEpoch for partition $topicPartition since its associated " +
s"leader epoch $requestLeaderEpoch is not higher than the current " +
s"leader epoch $currentLeaderEpoch")
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
}
}
val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
stateInfo.basePartitionState.leader == localBrokerId
}
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
else
Set.empty[Partition]
leaderAndIsrRequest.partitionStates.asScala.keys.foreach { topicPartition =>
/*
* If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
* before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
* In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
* we need to map this topic-partition to OfflinePartition instead.
*/
if (localReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition))
allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
}
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
if (!hwThreadInitialized) {
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
for (partition <- newPartitions) {
val topicPartition = partition.topicPartition
if (logManager.getLog(topicPartition, isFuture = true).isDefined) {
partition.localReplica.foreach { replica =>
val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
// Add future replica to partition's map
partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false)
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader,
partition.getLeaderEpoch, replica.highWatermark.messageOffset))
}
}
}
replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)
}
}
}
/*
* Make the current broker to become leader for a given set of partitions by:
*
* 1. Stop fetchers for these partitions
* 2. Update the partition metadata in cache
* 3. Add these partitions to the leader partitions set
*
* If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
* the error message will be set on each partition since we do not know which partition caused it. Otherwise,
* return the set of partitions that are made leader due to this method
*
* TODO: the above may need to be fixed later
*/
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " +
s"controller $controllerId epoch $epoch starting the become-leader transition for " +
s"partition ${partition.topicPartition}")
}
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE)
val partitionsToMakeLeaders = mutable.Set[Partition]()
try {
// First stop fetchers for all the partitions
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
// Update the partition information to be the leader
partitionState.foreach{ case (partition, partitionStateInfo) =>
try {
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
partitionsToMakeLeaders += partition
stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})")
} else
stateChangeLogger.info(s"Skipped the become-leader state change after marking its " +
s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " +
s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
s"since it is already the leader for the partition.")
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Skipped the become-leader state change with " +
s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " +
s"the replica for the partition is offline due to disk error $e")
val dirOpt = getLogDir(partition.topicPartition)
error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e)
responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
} catch {
case e: Throwable =>
partitionState.keys.foreach { partition =>
stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " +
s"from controller $controllerId epoch $epoch for partition ${partition.topicPartition}", e)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $epoch for the become-leader transition for partition ${partition.topicPartition}")
}
partitionsToMakeLeaders
}
/*
* Make the current broker to become follower for a given set of partitions by:
*
* 1. Remove these partitions from the leader partitions set.
* 2. Mark the replicas as followers so that no more data can be added from the producer clients.
* 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads.
* 4. Truncate the log and checkpoint offsets for these partitions.
* 5. Clear the produce and fetch requests in the purgatory
* 6. If the broker is not shutting down, add the fetcher to the new leaders.
*
* The ordering of doing these steps make sure that the replicas in transition will not
* take any more messages before checkpointing offsets so that all messages before the checkpoint
* are guaranteed to be flushed to disks
*
* If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
* the error message will be set on each partition since we do not know which partition caused it. Otherwise,
* return the set of partitions that are made follower due to this method
*/
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
partitionStates.foreach { case (partition, partitionState) =>
stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition} with leader " +
s"${partitionState.basePartitionState.leader}")
}
for (partition <- partitionStates.keys)
responseMap.put(partition.topicPartition, Errors.NONE)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
// TODO: Delete leaders from LeaderAndIsrRequest
partitionStates.foreach { case (partition, partitionStateInfo) =>
val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
try {
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(_) =>
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " +
s"for partition ${partition.topicPartition} (last update " +
s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
s"since the new leader $newLeaderBrokerId is the same as the old leader")
case None =>
// The leader broker should always be present in the metadata cache.
// If not, we should record the error message and abort the transition process for this partition
stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " +
s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " +
s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew)
}
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " +
s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " +
s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) with leader " +
s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e")
val dirOpt = getLogDir(partition.topicPartition)
error(s"Error while making broker the follower for partition $partition with leader " +
s"$newLeaderBrokerId in dir $dirOpt", e)
responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " +
s"${partitionStates(partition).basePartitionState.leader}")
}
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
tryCompleteDelayedProduce(topicPartitionOperationKey)
tryCompleteDelayedFetch(topicPartitionOperationKey)
}
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " +
s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " +
s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}")
}
if (isShuttingDown.get()) {
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " +
s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " +
s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader} " +
"since it is shutting down")
}
}
else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>
val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get
.brokerEndPoint(config.interBrokerListenerName)
val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset
partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
}.toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " +
s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " +
s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}")
}
}
} catch {
case e: Throwable =>
stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " +
s"received from controller $controllerId epoch $epoch", e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionStates.keys.foreach { partition =>
stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " +
s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition} with leader " +
s"${partitionStates(partition).basePartitionState.leader}")
}
partitionsToMakeFollower
}
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
}
/**
* Update the follower's fetch state in the leader based on the last fetch request and update `readResult`,
* if the follower replica is not recognized to be one of the assigned replicas. Do not update
* `readResult` otherwise, so that log start/end offset and high watermark is consistent with
* records in fetch response. Log start/end offset and high watermark may change not only due to
* this fetch request, e.g., rolling new log segment and removing old log segment may move log
* start offset further than the last offset in the fetched records. The followers will get the
* updated leader's state in the next fetch response.
*/
private def updateFollowerLogReadResults(replicaId: Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
debug(s"Recording follower broker $replicaId log end offsets: $readResults")
readResults.map { case (topicPartition, readResult) =>
var updatedReadResult = readResult
nonOfflinePartition(topicPartition) match {
case Some(partition) =>
partition.getReplica(replicaId) match {
case Some(replica) =>
partition.updateReplicaLogReadResult(replica, readResult)
case None =>
warn(s"Leader $localBrokerId failed to record follower $replicaId's position " +
s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +
s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} " +
s"for partition $topicPartition. Empty records will be returned for this partition.")
updatedReadResult = readResult.withEmptyFetchInfo
}
case None =>
warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.")
}
topicPartition -> updatedReadResult
}
}
private def leaderPartitionsIterator: Iterator[Partition] =
nonOfflinePartitionsIterator.filter(_.leaderReplicaIfLocal.isDefined)
def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset))
// Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks() {
val replicas = nonOfflinePartitionsIterator.flatMap { partition =>
val replicasList: mutable.Set[Replica] = mutable.Set()
partition.localReplica.foreach(replicasList.add)
partition.futureLocalReplica.foreach(replicasList.add)
replicasList
}.filter(_.log.isDefined).toBuffer
val replicasByDir = replicas.groupBy(_.log.get.dir.getParent)
for ((dir, reps) <- replicasByDir) {
val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap
try {
highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
} catch {
case e: KafkaStorageException =>
error(s"Error while writing to highwatermark file in directory $dir", e)
}
}
}
// Used only by test
def markPartitionOffline(tp: TopicPartition) {
allPartitions.put(tp, ReplicaManager.OfflinePartition)
}
// logDir should be an absolute path
// sendZkNotification is needed for unit test
def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true) {
if (!logManager.isLogDirOnline(dir))
return
info(s"Stopping serving replicas in dir $dir")
replicaStateChangeLock synchronized {
val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition =>
partition.localReplica.exists { replica =>
replica.log.isDefined && replica.log.get.dir.getParent == dir
}
}.map(_.topicPartition).toSet
val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition =>
partition.futureLocalReplica.exists { replica =>
replica.log.isDefined && replica.log.get.dir.getParent == dir
}
}.toSet
replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition))
partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false))
newOfflinePartitions.foreach { topicPartition =>
val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)
partition.removePartitionMetrics()
}
newOfflinePartitions.map(_.topic).foreach { topic: String =>
val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic)
if (!topicHasPartitions)
brokerTopicStats.removeMetrics(topic)
}
highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
info(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " +
s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.")
}
logManager.handleLogDirFailure(dir)
if (sendZkNotification)
zkClient.propagateLogDirEvent(localBrokerId)
info(s"Stopped serving replicas in dir $dir")
}
def removeMetrics() {
removeMetric("LeaderCount")
removeMetric("PartitionCount")
removeMetric("OfflineReplicaCount")
removeMetric("UnderReplicatedPartitions")
removeMetric("UnderMinIsrPartitionCount")
}
// High watermark do not need to be checkpointed only when under unit tests
def shutdown(checkpointHW: Boolean = true) {
info("Shutting down")
removeMetrics()
if (logDirFailureHandler != null)
logDirFailureHandler.shutdown()
replicaFetcherManager.shutdown()
replicaAlterLogDirsManager.shutdown()
delayedFetchPurgatory.shutdown()
delayedProducePurgatory.shutdown()
delayedDeleteRecordsPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
info("Shut down completely")
}
protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
}
protected def createReplicaAlterLogDirsManager(quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats)
}
def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = {
requestedEpochInfo.map { case (tp, partitionData) =>
val epochEndOffset = getPartition(tp) match {
case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition)
new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
else
partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch,
fetchOnlyFromLeader = true)
case None if metadataCache.contains(tp) =>
new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
case None =>
new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
}
tp -> epochEndOffset
}
}
}