| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.server |
| |
| import java.io.File |
| import java.util.Optional |
| import java.util.concurrent.TimeUnit |
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} |
| import java.util.concurrent.locks.Lock |
| |
| import com.yammer.metrics.core.Gauge |
| import kafka.api._ |
| import kafka.cluster.{BrokerEndPoint, Partition, Replica} |
| import kafka.controller.{KafkaController, StateChangeLogger} |
| import kafka.log._ |
| import kafka.metrics.KafkaMetricsGroup |
| import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} |
| import kafka.server.checkpoints.OffsetCheckpointFile |
| import kafka.utils._ |
| import kafka.zk.KafkaZkClient |
| import org.apache.kafka.common.errors._ |
| import org.apache.kafka.common.internals.Topic |
| import org.apache.kafka.common.metrics.Metrics |
| import org.apache.kafka.common.protocol.Errors |
| import org.apache.kafka.common.record._ |
| import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction |
| import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo} |
| import org.apache.kafka.common.requests.EpochEndOffset._ |
| import org.apache.kafka.common.requests.FetchRequest.PartitionData |
| import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse |
| import org.apache.kafka.common.requests._ |
| import org.apache.kafka.common.utils.Time |
| import org.apache.kafka.common.TopicPartition |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection._ |
| |
| /* |
| * Result metadata of a log append operation on the log |
| */ |
| case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) { |
| def error: Errors = exception match { |
| case None => Errors.NONE |
| case Some(e) => Errors.forException(e) |
| } |
| } |
| |
| case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exception: Option[Throwable] = None) { |
| def error: Errors = exception match { |
| case None => Errors.NONE |
| case Some(e) => Errors.forException(e) |
| } |
| } |
| |
| /* |
| * Result metadata of a log read operation on the log |
| * @param info @FetchDataInfo returned by the @Log read |
| * @param hw high watermark of the local replica |
| * @param readSize amount of data that was read from the log i.e. size of the fetch |
| * @param isReadFromLogEnd true if the request read up to the log end offset snapshot |
| * when the read was initiated, false otherwise |
| * @param error Exception if error encountered while reading from the log |
| */ |
| case class LogReadResult(info: FetchDataInfo, |
| highWatermark: Long, |
| leaderLogStartOffset: Long, |
| leaderLogEndOffset: Long, |
| followerLogStartOffset: Long, |
| fetchTimeMs: Long, |
| readSize: Int, |
| lastStableOffset: Option[Long], |
| exception: Option[Throwable] = None) { |
| |
| def error: Errors = exception match { |
| case None => Errors.NONE |
| case Some(e) => Errors.forException(e) |
| } |
| |
| def withEmptyFetchInfo: LogReadResult = |
| copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)) |
| |
| override def toString = |
| s"Fetch Data: [$info], HW: [$highWatermark], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " + |
| s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], lastStableOffset: [$lastStableOffset], error: [$error]" |
| |
| } |
| |
| case class FetchPartitionData(error: Errors = Errors.NONE, |
| highWatermark: Long, |
| logStartOffset: Long, |
| records: Records, |
| lastStableOffset: Option[Long], |
| abortedTransactions: Option[List[AbortedTransaction]]) |
| |
| object LogReadResult { |
| val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), |
| highWatermark = -1L, |
| leaderLogStartOffset = -1L, |
| leaderLogEndOffset = -1L, |
| followerLogStartOffset = -1L, |
| fetchTimeMs = -1L, |
| readSize = -1, |
| lastStableOffset = None) |
| } |
| |
| object ReplicaManager { |
| val HighWatermarkFilename = "replication-offset-checkpoint" |
| val IsrChangePropagationBlackOut = 5000L |
| val IsrChangePropagationInterval = 60000L |
| val OfflinePartition: Partition = new Partition(new TopicPartition("", -1), |
| isOffline = true, |
| replicaLagTimeMaxMs = 0L, |
| localBrokerId = -1, |
| time = null, |
| replicaManager = null, |
| logManager = null, |
| zkClient = null) |
| } |
| |
| class ReplicaManager(val config: KafkaConfig, |
| metrics: Metrics, |
| time: Time, |
| val zkClient: KafkaZkClient, |
| scheduler: Scheduler, |
| val logManager: LogManager, |
| val isShuttingDown: AtomicBoolean, |
| quotaManagers: QuotaManagers, |
| val brokerTopicStats: BrokerTopicStats, |
| val metadataCache: MetadataCache, |
| logDirFailureChannel: LogDirFailureChannel, |
| val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], |
| val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], |
| val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], |
| threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup { |
| |
| def this(config: KafkaConfig, |
| metrics: Metrics, |
| time: Time, |
| zkClient: KafkaZkClient, |
| scheduler: Scheduler, |
| logManager: LogManager, |
| isShuttingDown: AtomicBoolean, |
| quotaManagers: QuotaManagers, |
| brokerTopicStats: BrokerTopicStats, |
| metadataCache: MetadataCache, |
| logDirFailureChannel: LogDirFailureChannel, |
| threadNamePrefix: Option[String] = None) { |
| this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown, |
| quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, |
| DelayedOperationPurgatory[DelayedProduce]( |
| purgatoryName = "Produce", brokerId = config.brokerId, |
| purgeInterval = config.producerPurgatoryPurgeIntervalRequests), |
| DelayedOperationPurgatory[DelayedFetch]( |
| purgatoryName = "Fetch", brokerId = config.brokerId, |
| purgeInterval = config.fetchPurgatoryPurgeIntervalRequests), |
| DelayedOperationPurgatory[DelayedDeleteRecords]( |
| purgatoryName = "DeleteRecords", brokerId = config.brokerId, |
| purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests), |
| threadNamePrefix) |
| } |
| |
| /* epoch of the controller that last changed the leader */ |
| @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch |
| private val localBrokerId = config.brokerId |
| private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp => |
| Partition(tp, time, this))) |
| private val replicaStateChangeLock = new Object |
| val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower) |
| val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager(quotaManagers.alterLogDirs, brokerTopicStats) |
| private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) |
| @volatile var highWatermarkCheckpoints = logManager.liveLogDirs.map(dir => |
| (dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap |
| |
| private var hwThreadInitialized = false |
| this.logIdent = s"[ReplicaManager broker=$localBrokerId] " |
| private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None) |
| |
| private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]() |
| private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) |
| private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) |
| |
| private var logDirFailureHandler: LogDirFailureHandler = null |
| |
| private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) { |
| override def doWork() { |
| val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir() |
| if (haltBrokerOnDirFailure) { |
| fatal(s"Halting broker because dir $newOfflineLogDir is offline") |
| Exit.halt(1) |
| } |
| handleLogDirFailure(newOfflineLogDir) |
| } |
| } |
| |
| val leaderCount = newGauge( |
| "LeaderCount", |
| new Gauge[Int] { |
| def value = leaderPartitionsIterator.size |
| } |
| ) |
| val partitionCount = newGauge( |
| "PartitionCount", |
| new Gauge[Int] { |
| def value = allPartitions.size |
| } |
| ) |
| val offlineReplicaCount = newGauge( |
| "OfflineReplicaCount", |
| new Gauge[Int] { |
| def value = offlinePartitionsIterator.size |
| } |
| ) |
| val underReplicatedPartitions = newGauge( |
| "UnderReplicatedPartitions", |
| new Gauge[Int] { |
| def value = underReplicatedPartitionCount |
| } |
| ) |
| val underMinIsrPartitionCount = newGauge( |
| "UnderMinIsrPartitionCount", |
| new Gauge[Int] { |
| def value = leaderPartitionsIterator.count(_.isUnderMinIsr) |
| } |
| ) |
| |
| val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) |
| val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) |
| val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS) |
| |
| def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated) |
| |
| def startHighWaterMarksCheckPointThread() = { |
| if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) |
| scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks _, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS) |
| } |
| |
| def recordIsrChange(topicPartition: TopicPartition) { |
| isrChangeSet synchronized { |
| isrChangeSet += topicPartition |
| lastIsrChangeMs.set(System.currentTimeMillis()) |
| } |
| } |
| /** |
| * This function periodically runs to see if ISR needs to be propagated. It propagates ISR when: |
| * 1. There is ISR change not propagated yet. |
| * 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation. |
| * This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and |
| * other brokers when large amount of ISR change occurs. |
| */ |
| def maybePropagateIsrChanges() { |
| val now = System.currentTimeMillis() |
| isrChangeSet synchronized { |
| if (isrChangeSet.nonEmpty && |
| (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now || |
| lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) { |
| zkClient.propagateIsrChanges(isrChangeSet) |
| isrChangeSet.clear() |
| lastIsrPropagationMs.set(now) |
| } |
| } |
| } |
| |
| // When ReplicaAlterDirThread finishes replacing a current replica with a future replica, it will |
| // remove the partition from the partition state map. But it will not close itself even if the |
| // partition state map is empty. Thus we need to call shutdownIdleReplicaAlterDirThread() periodically |
| // to shutdown idle ReplicaAlterDirThread |
| def shutdownIdleReplicaAlterLogDirsThread(): Unit = { |
| replicaAlterLogDirsManager.shutdownIdleFetcherThreads() |
| } |
| |
| def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition) |
| |
| /** |
| * Try to complete some delayed produce requests with the request key; |
| * this can be triggered when: |
| * |
| * 1. The partition HW has changed (for acks = -1) |
| * 2. A follower replica's fetch operation is received (for acks > 1) |
| */ |
| def tryCompleteDelayedProduce(key: DelayedOperationKey) { |
| val completed = delayedProducePurgatory.checkAndComplete(key) |
| debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) |
| } |
| |
| /** |
| * Try to complete some delayed fetch requests with the request key; |
| * this can be triggered when: |
| * |
| * 1. The partition HW has changed (for regular fetch) |
| * 2. A new message set is appended to the local log (for follower fetch) |
| */ |
| def tryCompleteDelayedFetch(key: DelayedOperationKey) { |
| val completed = delayedFetchPurgatory.checkAndComplete(key) |
| debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed)) |
| } |
| |
| /** |
| * Try to complete some delayed DeleteRecordsRequest with the request key; |
| * this needs to be triggered when the partition low watermark has changed |
| */ |
| def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) { |
| val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key) |
| debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed)) |
| } |
| |
| def startup() { |
| // start ISR expiration thread |
| // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR |
| scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS) |
| scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS) |
| scheduler.schedule("shutdown-idle-replica-alter-log-dirs-thread", shutdownIdleReplicaAlterLogDirsThread _, period = 10000L, unit = TimeUnit.MILLISECONDS) |
| |
| // If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field. |
| // In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed. |
| // Thus, we choose to halt the broker on any log diretory failure if IBP < 1.0 |
| val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0 |
| logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure) |
| logDirFailureHandler.start() |
| } |
| |
| def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean) = { |
| stateChangeLogger.trace(s"Handling stop replica (delete=$deletePartition) for partition $topicPartition") |
| |
| if (deletePartition) { |
| val removedPartition = allPartitions.remove(topicPartition) |
| if (removedPartition eq ReplicaManager.OfflinePartition) { |
| allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) |
| throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk") |
| } |
| |
| if (removedPartition != null) { |
| val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic) |
| if (!topicHasPartitions) |
| brokerTopicStats.removeMetrics(topicPartition.topic) |
| // this will delete the local log. This call may throw exception if the log is on offline directory |
| removedPartition.delete() |
| } else { |
| stateChangeLogger.trace(s"Ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker") |
| } |
| |
| // Delete log and corresponding folders in case replica manager doesn't hold them anymore. |
| // This could happen when topic is being deleted while broker is down and recovers. |
| if (logManager.getLog(topicPartition).isDefined) |
| logManager.asyncDelete(topicPartition) |
| if (logManager.getLog(topicPartition, isFuture = true).isDefined) |
| logManager.asyncDelete(topicPartition, isFuture = true) |
| } |
| stateChangeLogger.trace(s"Finished handling stop replica (delete=$deletePartition) for partition $topicPartition") |
| } |
| |
| def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = { |
| replicaStateChangeLock synchronized { |
| val responseMap = new collection.mutable.HashMap[TopicPartition, Errors] |
| if(stopReplicaRequest.controllerEpoch() < controllerEpoch) { |
| stateChangeLogger.warn("Received stop replica request from an old controller epoch " + |
| s"${stopReplicaRequest.controllerEpoch}. Latest known controller epoch is $controllerEpoch") |
| (responseMap, Errors.STALE_CONTROLLER_EPOCH) |
| } else { |
| val partitions = stopReplicaRequest.partitions.asScala |
| controllerEpoch = stopReplicaRequest.controllerEpoch |
| // First stop fetchers for all partitions, then stop the corresponding replicas |
| replicaFetcherManager.removeFetcherForPartitions(partitions) |
| replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) |
| for (topicPartition <- partitions){ |
| try { |
| stopReplica(topicPartition, stopReplicaRequest.deletePartitions) |
| responseMap.put(topicPartition, Errors.NONE) |
| } catch { |
| case e: KafkaStorageException => |
| stateChangeLogger.error(s"Ignoring stop replica (delete=${stopReplicaRequest.deletePartitions}) for " + |
| s"partition $topicPartition due to storage exception", e) |
| responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) |
| } |
| } |
| (responseMap, Errors.NONE) |
| } |
| } |
| } |
| |
| def getOrCreatePartition(topicPartition: TopicPartition): Partition = |
| allPartitions.getAndMaybePut(topicPartition) |
| |
| def getPartition(topicPartition: TopicPartition): Option[Partition] = |
| Option(allPartitions.get(topicPartition)) |
| |
| def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = |
| getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition) |
| |
| private def nonOfflinePartitionsIterator: Iterator[Partition] = |
| allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) |
| |
| private def offlinePartitionsIterator: Iterator[Partition] = |
| allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition) |
| |
| |
| def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = { |
| getPartition(topicPartition) match { |
| case Some(partition) => |
| if (partition eq ReplicaManager.OfflinePartition) |
| throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory") |
| else |
| partition |
| |
| case None if metadataCache.contains(topicPartition) => |
| if (expectLeader) { |
| // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which |
| // forces clients to refresh metadata to find the new location. This can happen, for example, |
| // during a partition reassignment if a produce request from the client is sent to a broker after |
| // the local replica has been deleted. |
| throw new NotLeaderForPartitionException(s"Broker $localBrokerId is not a replica of $topicPartition") |
| } else { |
| throw new ReplicaNotAvailableException(s"Partition $topicPartition is not available") |
| } |
| |
| case None => |
| throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist") |
| } |
| } |
| |
| def localReplicaOrException(topicPartition: TopicPartition): Replica = { |
| getPartitionOrException(topicPartition, expectLeader = false).localReplicaOrException |
| } |
| |
| def futureLocalReplicaOrException(topicPartition: TopicPartition): Replica = { |
| getPartitionOrException(topicPartition, expectLeader = false).futureLocalReplicaOrException |
| } |
| |
| def futureLocalReplica(topicPartition: TopicPartition): Option[Replica] = { |
| nonOfflinePartition(topicPartition).flatMap(_.futureLocalReplica) |
| } |
| |
| def localReplica(topicPartition: TopicPartition): Option[Replica] = { |
| nonOfflinePartition(topicPartition).flatMap(_.localReplica) |
| } |
| |
| def getLogDir(topicPartition: TopicPartition): Option[String] = { |
| localReplica(topicPartition).flatMap(_.log).map(_.dir.getParent) |
| } |
| |
| /** |
| * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; |
| * the callback function will be triggered either when timeout or the required acks are satisfied; |
| * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. |
| */ |
| def appendRecords(timeout: Long, |
| requiredAcks: Short, |
| internalTopicsAllowed: Boolean, |
| isFromClient: Boolean, |
| entriesPerPartition: Map[TopicPartition, MemoryRecords], |
| responseCallback: Map[TopicPartition, PartitionResponse] => Unit, |
| delayedProduceLock: Option[Lock] = None, |
| recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()) { |
| if (isValidRequiredAcks(requiredAcks)) { |
| val sTime = time.milliseconds |
| val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, |
| isFromClient = isFromClient, entriesPerPartition, requiredAcks) |
| debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) |
| |
| val produceStatus = localProduceResults.map { case (topicPartition, result) => |
| topicPartition -> |
| ProducePartitionStatus( |
| result.info.lastOffset + 1, // required offset |
| new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime, result.info.logStartOffset)) // response status |
| } |
| |
| recordConversionStatsCallback(localProduceResults.mapValues(_.info.recordConversionStats)) |
| |
| if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) { |
| // create delayed produce operation |
| val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) |
| val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock) |
| |
| // create a list of (topic, partition) pairs to use as keys for this delayed produce operation |
| val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq |
| |
| // try to complete the request immediately, otherwise put it into the purgatory |
| // this is because while the delayed produce operation is being created, new |
| // requests may arrive and hence make this operation completable. |
| delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) |
| |
| } else { |
| // we can respond immediately |
| val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus) |
| responseCallback(produceResponseStatus) |
| } |
| } else { |
| // If required.acks is outside accepted range, something is wrong with the client |
| // Just return an error and don't handle the request at all |
| val responseStatus = entriesPerPartition.map { case (topicPartition, _) => |
| topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS, |
| LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset) |
| } |
| responseCallback(responseStatus) |
| } |
| } |
| |
| /** |
| * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas; |
| * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset |
| */ |
| private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = { |
| trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition)) |
| offsetPerPartition.map { case (topicPartition, requestedOffset) => |
| // reject delete records operation on internal topics |
| if (Topic.isInternal(topicPartition.topic)) { |
| (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}")))) |
| } else { |
| try { |
| val partition = getPartitionOrException(topicPartition, expectLeader = true) |
| val logDeleteResult = partition.deleteRecordsOnLeader(requestedOffset) |
| (topicPartition, logDeleteResult) |
| } catch { |
| case e@ (_: UnknownTopicOrPartitionException | |
| _: NotLeaderForPartitionException | |
| _: OffsetOutOfRangeException | |
| _: PolicyViolationException | |
| _: KafkaStorageException) => |
| (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e))) |
| case t: Throwable => |
| error("Error processing delete records operation on partition %s".format(topicPartition), t) |
| (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(t))) |
| } |
| } |
| } |
| } |
| |
| // If there exists a topic partition that meets the following requirement, |
| // we need to put a delayed DeleteRecordsRequest and wait for the delete records operation to complete |
| // |
| // 1. the delete records operation on this partition is successful |
| // 2. low watermark of this partition is smaller than the specified offset |
| private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = { |
| localDeleteRecordsResults.exists{ case (_, deleteRecordsResult) => |
| deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset |
| } |
| } |
| |
| /** |
| * For each pair of partition and log directory specified in the map, if the partition has already been created on |
| * this broker, move its log files to the specified log directory. Otherwise, record the pair in the memory so that |
| * the partition will be created in the specified log directory when broker receives LeaderAndIsrRequest for the partition later. |
| */ |
| def alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors] = { |
| replicaStateChangeLock synchronized { |
| partitionDirs.map { case (topicPartition, destinationDir) => |
| try { |
| if (!logManager.isLogDirOnline(destinationDir)) |
| throw new KafkaStorageException(s"Log directory $destinationDir is offline") |
| |
| getPartition(topicPartition).foreach { partition => |
| if (partition eq ReplicaManager.OfflinePartition) |
| throw new KafkaStorageException(s"Partition $topicPartition is offline") |
| |
| // Stop current replica movement if the destinationDir is different from the existing destination log directory |
| if (partition.futureReplicaDirChanged(destinationDir)) { |
| replicaAlterLogDirsManager.removeFetcherForPartitions(Set(topicPartition)) |
| partition.removeFutureLocalReplica() |
| } |
| } |
| |
| // If the log for this partition has not been created yet: |
| // 1) Record the destination log directory in the memory so that the partition will be created in this log directory |
| // when broker receives LeaderAndIsrRequest for this partition later. |
| // 2) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaLogDirsResponse |
| logManager.maybeUpdatePreferredLogDir(topicPartition, destinationDir) |
| |
| // throw ReplicaNotAvailableException if replica does not exit for the given partition |
| val partition = getPartitionOrException(topicPartition, expectLeader = false) |
| partition.localReplicaOrException |
| |
| // If the destinationLDir is different from the current log directory of the replica: |
| // - If there is no offline log directory, create the future log in the destinationDir (if it does not exist) and |
| // start ReplicaAlterDirThread to move data of this partition from the current log to the future log |
| // - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory |
| // so that we can avoid creating future log for the same partition in multiple log directories. |
| if (partition.maybeCreateFutureReplica(destinationDir)) { |
| val futureReplica = futureLocalReplicaOrException(topicPartition) |
| logManager.abortAndPauseCleaning(topicPartition) |
| |
| val initialFetchState = InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1), |
| partition.getLeaderEpoch, futureReplica.highWatermark.messageOffset) |
| replicaAlterLogDirsManager.addFetcherForPartitions(Map(topicPartition -> initialFetchState)) |
| } |
| |
| (topicPartition, Errors.NONE) |
| } catch { |
| case e@(_: LogDirNotFoundException | |
| _: ReplicaNotAvailableException | |
| _: KafkaStorageException) => |
| (topicPartition, Errors.forException(e)) |
| case t: Throwable => |
| error("Error while changing replica dir for partition %s".format(topicPartition), t) |
| (topicPartition, Errors.forException(t)) |
| } |
| } |
| } |
| } |
| |
| /* |
| * Get the LogDirInfo for the specified list of partitions. |
| * |
| * Each LogDirInfo specifies the following information for a given log directory: |
| * 1) Error of the log directory, e.g. whether the log is online or offline |
| * 2) size and lag of current and future logs for each partition in the given log directory. Only logs of the queried partitions |
| * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. |
| */ |
| def describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo] = { |
| val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent) |
| |
| config.logDirs.toSet.map { logDir: String => |
| val absolutePath = new File(logDir).getAbsolutePath |
| try { |
| if (!logManager.isLogDirOnline(absolutePath)) |
| throw new KafkaStorageException(s"Log directory $absolutePath is offline") |
| |
| logsByDir.get(absolutePath) match { |
| case Some(logs) => |
| val replicaInfos = logs.filter { log => |
| partitions.contains(log.topicPartition) |
| }.map { log => |
| log.topicPartition -> new ReplicaInfo(log.size, getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture), log.isFuture) |
| }.toMap |
| |
| (absolutePath, new LogDirInfo(Errors.NONE, replicaInfos.asJava)) |
| case None => |
| (absolutePath, new LogDirInfo(Errors.NONE, Map.empty[TopicPartition, ReplicaInfo].asJava)) |
| } |
| |
| } catch { |
| case _: KafkaStorageException => |
| (absolutePath, new LogDirInfo(Errors.KAFKA_STORAGE_ERROR, Map.empty[TopicPartition, ReplicaInfo].asJava)) |
| case t: Throwable => |
| error(s"Error while describing replica in dir $absolutePath", t) |
| (absolutePath, new LogDirInfo(Errors.forException(t), Map.empty[TopicPartition, ReplicaInfo].asJava)) |
| } |
| }.toMap |
| } |
| |
| def getLogEndOffsetLag(topicPartition: TopicPartition, logEndOffset: Long, isFuture: Boolean): Long = { |
| localReplica(topicPartition) match { |
| case Some(replica) => |
| if (isFuture) |
| replica.logEndOffset.messageOffset - logEndOffset |
| else |
| math.max(replica.highWatermark.messageOffset - logEndOffset, 0) |
| case None => |
| // return -1L to indicate that the LEO lag is not available if the replica is not created or is offline |
| DescribeLogDirsResponse.INVALID_OFFSET_LAG |
| } |
| } |
| |
| def deleteRecords(timeout: Long, |
| offsetPerPartition: Map[TopicPartition, Long], |
| responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) { |
| val timeBeforeLocalDeleteRecords = time.milliseconds |
| val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition) |
| debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords)) |
| |
| val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) => |
| topicPartition -> |
| DeleteRecordsPartitionStatus( |
| result.requestedOffset, // requested offset |
| new DeleteRecordsResponse.PartitionResponse(result.lowWatermark, result.error)) // response status |
| } |
| |
| if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) { |
| // create delayed delete records operation |
| val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback) |
| |
| // create a list of (topic, partition) pairs to use as keys for this delayed delete records operation |
| val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq |
| |
| // try to complete the request immediately, otherwise put it into the purgatory |
| // this is because while the delayed delete records operation is being created, new |
| // requests may arrive and hence make this operation completable. |
| delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys) |
| } else { |
| // we can respond immediately |
| val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus) |
| responseCallback(deleteRecordsResponseStatus) |
| } |
| } |
| |
| // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete |
| // |
| // 1. required acks = -1 |
| // 2. there is data to append |
| // 3. at least one partition append was successful (fewer errors than partitions) |
| private def delayedProduceRequestRequired(requiredAcks: Short, |
| entriesPerPartition: Map[TopicPartition, MemoryRecords], |
| localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = { |
| requiredAcks == -1 && |
| entriesPerPartition.nonEmpty && |
| localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size |
| } |
| |
| private def isValidRequiredAcks(requiredAcks: Short): Boolean = { |
| requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0 |
| } |
| |
| /** |
| * Append the messages to the local replica logs |
| */ |
| private def appendToLocalLog(internalTopicsAllowed: Boolean, |
| isFromClient: Boolean, |
| entriesPerPartition: Map[TopicPartition, MemoryRecords], |
| requiredAcks: Short): Map[TopicPartition, LogAppendResult] = { |
| trace(s"Append [$entriesPerPartition] to local log") |
| entriesPerPartition.map { case (topicPartition, records) => |
| brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark() |
| brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark() |
| |
| // reject appending to internal topics if it is not allowed |
| if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) { |
| (topicPartition, LogAppendResult( |
| LogAppendInfo.UnknownLogAppendInfo, |
| Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")))) |
| } else { |
| try { |
| val partition = getPartitionOrException(topicPartition, expectLeader = true) |
| val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks) |
| val numAppendedMessages = info.numMessages |
| |
| // update stats for successfully appended bytes and messages as bytesInRate and messageInRate |
| brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes) |
| brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes) |
| brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages) |
| brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages) |
| |
| trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " + |
| s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}") |
| (topicPartition, LogAppendResult(info)) |
| } catch { |
| // NOTE: Failed produce requests metric is not incremented for known exceptions |
| // it is supposed to indicate un-expected failures of a broker in handling a produce request |
| case e@ (_: UnknownTopicOrPartitionException | |
| _: NotLeaderForPartitionException | |
| _: RecordTooLargeException | |
| _: RecordBatchTooLargeException | |
| _: CorruptRecordException | |
| _: KafkaStorageException | |
| _: InvalidTimestampException) => |
| (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) |
| case t: Throwable => |
| val logStartOffset = getPartition(topicPartition) match { |
| case Some(partition) => |
| partition.logStartOffset |
| case _ => |
| -1 |
| } |
| brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark() |
| brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark() |
| error(s"Error processing append operation on partition $topicPartition", t) |
| (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t))) |
| } |
| } |
| } |
| } |
| |
| def fetchOffsetForTimestamp(topicPartition: TopicPartition, |
| timestamp: Long, |
| isolationLevel: Option[IsolationLevel], |
| currentLeaderEpoch: Optional[Integer], |
| fetchOnlyFromLeader: Boolean): TimestampOffset = { |
| val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader) |
| partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader) |
| } |
| |
| def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition, |
| timestamp: Long, |
| maxNumOffsets: Int, |
| isFromConsumer: Boolean, |
| fetchOnlyFromLeader: Boolean): Seq[Long] = { |
| val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader) |
| partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader) |
| } |
| |
| /** |
| * Fetch messages from the leader replica, and wait until enough data can be fetched and return; |
| * the callback function will be triggered either when timeout or required fetch info is satisfied |
| */ |
| def fetchMessages(timeout: Long, |
| replicaId: Int, |
| fetchMinBytes: Int, |
| fetchMaxBytes: Int, |
| hardMaxBytesLimit: Boolean, |
| fetchInfos: Seq[(TopicPartition, PartitionData)], |
| quota: ReplicaQuota = UnboundedQuota, |
| responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, |
| isolationLevel: IsolationLevel) { |
| val isFromFollower = Request.isValidBrokerId(replicaId) |
| val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId |
| |
| val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId) |
| FetchLogEnd |
| else if (isolationLevel == IsolationLevel.READ_COMMITTED) |
| FetchTxnCommitted |
| else |
| FetchHighWatermark |
| |
| |
| def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { |
| val result = readFromLocalLog( |
| replicaId = replicaId, |
| fetchOnlyFromLeader = fetchOnlyFromLeader, |
| fetchIsolation = fetchIsolation, |
| fetchMaxBytes = fetchMaxBytes, |
| hardMaxBytesLimit = hardMaxBytesLimit, |
| readPartitionInfo = fetchInfos, |
| quota = quota) |
| if (isFromFollower) updateFollowerLogReadResults(replicaId, result) |
| else result |
| } |
| |
| val logReadResults = readFromLog() |
| |
| // check if this fetch request can be satisfied right away |
| val logReadResultValues = logReadResults.map { case (_, v) => v } |
| val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum |
| val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) => |
| errorIncurred || (readResult.error != Errors.NONE)) |
| |
| // respond immediately if 1) fetch request does not want to wait |
| // 2) fetch request does not require any data |
| // 3) has enough data to respond |
| // 4) some error happens while reading data |
| if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { |
| val fetchPartitionData = logReadResults.map { case (tp, result) => |
| tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, |
| result.lastStableOffset, result.info.abortedTransactions) |
| } |
| responseCallback(fetchPartitionData) |
| } else { |
| // construct the fetch results from the read results |
| val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) => |
| val fetchInfo = fetchInfos.collectFirst { |
| case (tp, v) if tp == topicPartition => v |
| }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos")) |
| (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo)) |
| } |
| val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, |
| fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) |
| val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) |
| |
| // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation |
| val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) } |
| |
| // try to complete the request immediately, otherwise put it into the purgatory; |
| // this is because while the delayed fetch operation is being created, new requests |
| // may arrive and hence make this operation completable. |
| delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) |
| } |
| } |
| |
| /** |
| * Read from multiple topic partitions at the given offset up to maxSize bytes |
| */ |
| def readFromLocalLog(replicaId: Int, |
| fetchOnlyFromLeader: Boolean, |
| fetchIsolation: FetchIsolation, |
| fetchMaxBytes: Int, |
| hardMaxBytesLimit: Boolean, |
| readPartitionInfo: Seq[(TopicPartition, PartitionData)], |
| quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = { |
| |
| def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { |
| val offset = fetchInfo.fetchOffset |
| val partitionFetchSize = fetchInfo.maxBytes |
| val followerLogStartOffset = fetchInfo.logStartOffset |
| |
| brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark() |
| brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() |
| |
| val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes) |
| try { |
| trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " + |
| s"remaining response limit $limitBytes" + |
| (if (minOneMessage) s", ignoring response/partition size limits" else "")) |
| |
| val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader) |
| val fetchTimeMs = time.milliseconds |
| |
| // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition |
| val readInfo = partition.readRecords( |
| fetchOffset = fetchInfo.fetchOffset, |
| currentLeaderEpoch = fetchInfo.currentLeaderEpoch, |
| maxBytes = adjustedMaxBytes, |
| fetchIsolation = fetchIsolation, |
| fetchOnlyFromLeader = fetchOnlyFromLeader, |
| minOneMessage = minOneMessage) |
| |
| val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) { |
| // If the partition is being throttled, simply return an empty set. |
| FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) |
| } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { |
| // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make |
| // progress in such cases and don't need to report a `RecordTooLargeException` |
| FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) |
| } else { |
| readInfo.fetchedData |
| } |
| |
| LogReadResult(info = fetchDataInfo, |
| highWatermark = readInfo.highWatermark, |
| leaderLogStartOffset = readInfo.logStartOffset, |
| leaderLogEndOffset = readInfo.logEndOffset, |
| followerLogStartOffset = followerLogStartOffset, |
| fetchTimeMs = fetchTimeMs, |
| readSize = adjustedMaxBytes, |
| lastStableOffset = Some(readInfo.lastStableOffset), |
| exception = None) |
| } catch { |
| // NOTE: Failed fetch requests metric is not incremented for known exceptions since it |
| // is supposed to indicate un-expected failure of a broker in handling a fetch request |
| case e@ (_: UnknownTopicOrPartitionException | |
| _: NotLeaderForPartitionException | |
| _: UnknownLeaderEpochException | |
| _: FencedLeaderEpochException | |
| _: ReplicaNotAvailableException | |
| _: KafkaStorageException | |
| _: OffsetOutOfRangeException) => |
| LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), |
| highWatermark = -1L, |
| leaderLogStartOffset = -1L, |
| leaderLogEndOffset = -1L, |
| followerLogStartOffset = -1L, |
| fetchTimeMs = -1L, |
| readSize = 0, |
| lastStableOffset = None, |
| exception = Some(e)) |
| case e: Throwable => |
| brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark() |
| brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark() |
| |
| val fetchSource = Request.describeReplicaId(replicaId) |
| error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " + |
| s"on partition $tp: $fetchInfo", e) |
| |
| LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY), |
| highWatermark = -1L, |
| leaderLogStartOffset = -1L, |
| leaderLogEndOffset = -1L, |
| followerLogStartOffset = -1L, |
| fetchTimeMs = -1L, |
| readSize = 0, |
| lastStableOffset = None, |
| exception = Some(e)) |
| } |
| } |
| |
| var limitBytes = fetchMaxBytes |
| val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)] |
| var minOneMessage = !hardMaxBytesLimit |
| readPartitionInfo.foreach { case (tp, fetchInfo) => |
| val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) |
| val recordBatchSize = readResult.info.records.sizeInBytes |
| // Once we read from a non-empty partition, we stop ignoring request and partition level size limits |
| if (recordBatchSize > 0) |
| minOneMessage = false |
| limitBytes = math.max(0, limitBytes - recordBatchSize) |
| result += (tp -> readResult) |
| } |
| result |
| } |
| |
| /** |
| * To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list, |
| * the quota is exceeded and the replica is not in sync. |
| */ |
| def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = { |
| val isReplicaInSync = nonOfflinePartition(topicPartition).exists { partition => |
| partition.getReplica(replicaId).exists(partition.inSyncReplicas.contains) |
| } |
| quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync |
| } |
| |
| def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = localReplica(topicPartition).flatMap(_.log.map(_.config)) |
| |
| def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value) |
| |
| def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = { |
| replicaStateChangeLock synchronized { |
| if(updateMetadataRequest.controllerEpoch < controllerEpoch) { |
| val stateControllerEpochErrorMessage = s"Received update metadata request with correlation id $correlationId " + |
| s"from an old controller ${updateMetadataRequest.controllerId} with epoch ${updateMetadataRequest.controllerEpoch}. " + |
| s"Latest known controller epoch is $controllerEpoch" |
| stateChangeLogger.warn(stateControllerEpochErrorMessage) |
| throw new ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage)) |
| } else { |
| val deletedPartitions = metadataCache.updateMetadata(correlationId, updateMetadataRequest) |
| controllerEpoch = updateMetadataRequest.controllerEpoch |
| deletedPartitions |
| } |
| } |
| } |
| |
| def becomeLeaderOrFollower(correlationId: Int, |
| leaderAndIsrRequest: LeaderAndIsrRequest, |
| onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = { |
| leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => |
| stateChangeLogger.trace(s"Received LeaderAndIsr request $stateInfo " + |
| s"correlation id $correlationId from controller ${leaderAndIsrRequest.controllerId} " + |
| s"epoch ${leaderAndIsrRequest.controllerEpoch} for partition $topicPartition") |
| } |
| replicaStateChangeLock synchronized { |
| if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) { |
| stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller ${leaderAndIsrRequest.controllerId} with " + |
| s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " + |
| s"Latest known controller epoch is $controllerEpoch") |
| leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception) |
| } else { |
| val responseMap = new mutable.HashMap[TopicPartition, Errors] |
| val controllerId = leaderAndIsrRequest.controllerId |
| controllerEpoch = leaderAndIsrRequest.controllerEpoch |
| |
| // First check partition's leader epoch |
| val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]() |
| val newPartitions = new mutable.HashSet[Partition] |
| |
| leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => |
| val partition = getPartition(topicPartition).getOrElse { |
| val createdPartition = getOrCreatePartition(topicPartition) |
| newPartitions.add(createdPartition) |
| createdPartition |
| } |
| val currentLeaderEpoch = partition.getLeaderEpoch |
| val requestLeaderEpoch = stateInfo.basePartitionState.leaderEpoch |
| if (partition eq ReplicaManager.OfflinePartition) { |
| stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + |
| s"controller $controllerId with correlation id $correlationId " + |
| s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + |
| "partition is in an offline log directory") |
| responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) |
| } else if (requestLeaderEpoch > currentLeaderEpoch) { |
| // If the leader epoch is valid record the epoch of the controller that made the leadership decision. |
| // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path |
| if(stateInfo.basePartitionState.replicas.contains(localBrokerId)) |
| partitionState.put(partition, stateInfo) |
| else { |
| stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " + |
| s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " + |
| s"in assigned replica list ${stateInfo.basePartitionState.replicas.asScala.mkString(",")}") |
| responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) |
| } |
| } else { |
| // Otherwise record the error code in response |
| stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + |
| s"controller $controllerId with correlation id $correlationId " + |
| s"epoch $controllerEpoch for partition $topicPartition since its associated " + |
| s"leader epoch $requestLeaderEpoch is not higher than the current " + |
| s"leader epoch $currentLeaderEpoch") |
| responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) |
| } |
| } |
| |
| val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => |
| stateInfo.basePartitionState.leader == localBrokerId |
| } |
| val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys |
| |
| val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) |
| makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) |
| else |
| Set.empty[Partition] |
| val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) |
| makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap) |
| else |
| Set.empty[Partition] |
| |
| leaderAndIsrRequest.partitionStates.asScala.keys.foreach { topicPartition => |
| /* |
| * If there is offline log directory, a Partition object may have been created by getOrCreatePartition() |
| * before getOrCreateReplica() failed to create local replica due to KafkaStorageException. |
| * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object. |
| * we need to map this topic-partition to OfflinePartition instead. |
| */ |
| if (localReplica(topicPartition).isEmpty && (allPartitions.get(topicPartition) ne ReplicaManager.OfflinePartition)) |
| allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) |
| } |
| |
| // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions |
| // have been completely populated before starting the checkpointing there by avoiding weird race conditions |
| if (!hwThreadInitialized) { |
| startHighWaterMarksCheckPointThread() |
| hwThreadInitialized = true |
| } |
| |
| val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState] |
| for (partition <- newPartitions) { |
| val topicPartition = partition.topicPartition |
| if (logManager.getLog(topicPartition, isFuture = true).isDefined) { |
| partition.localReplica.foreach { replica => |
| val leader = BrokerEndPoint(config.brokerId, "localhost", -1) |
| |
| // Add future replica to partition's map |
| partition.getOrCreateReplica(Request.FutureLocalReplicaId, isNew = false) |
| |
| // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move |
| // replica from source dir to destination dir |
| logManager.abortAndPauseCleaning(topicPartition) |
| |
| futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader, |
| partition.getLeaderEpoch, replica.highWatermark.messageOffset)) |
| } |
| } |
| } |
| replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset) |
| |
| replicaFetcherManager.shutdownIdleFetcherThreads() |
| replicaAlterLogDirsManager.shutdownIdleFetcherThreads() |
| onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) |
| new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava) |
| } |
| } |
| } |
| |
| /* |
| * Make the current broker to become leader for a given set of partitions by: |
| * |
| * 1. Stop fetchers for these partitions |
| * 2. Update the partition metadata in cache |
| * 3. Add these partitions to the leader partitions set |
| * |
| * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where |
| * the error message will be set on each partition since we do not know which partition caused it. Otherwise, |
| * return the set of partitions that are made leader due to this method |
| * |
| * TODO: the above may need to be fixed later |
| */ |
| private def makeLeaders(controllerId: Int, |
| epoch: Int, |
| partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState], |
| correlationId: Int, |
| responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = { |
| partitionState.keys.foreach { partition => |
| stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from " + |
| s"controller $controllerId epoch $epoch starting the become-leader transition for " + |
| s"partition ${partition.topicPartition}") |
| } |
| |
| for (partition <- partitionState.keys) |
| responseMap.put(partition.topicPartition, Errors.NONE) |
| |
| val partitionsToMakeLeaders = mutable.Set[Partition]() |
| |
| try { |
| // First stop fetchers for all the partitions |
| replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition)) |
| // Update the partition information to be the leader |
| partitionState.foreach{ case (partition, partitionStateInfo) => |
| try { |
| if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) { |
| partitionsToMakeLeaders += partition |
| stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " + |
| s"controller $controllerId epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} " + |
| s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch})") |
| } else |
| stateChangeLogger.info(s"Skipped the become-leader state change after marking its " + |
| s"partition as leader with correlation id $correlationId from controller $controllerId epoch $epoch for " + |
| s"partition ${partition.topicPartition} (last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + |
| s"since it is already the leader for the partition.") |
| } catch { |
| case e: KafkaStorageException => |
| stateChangeLogger.error(s"Skipped the become-leader state change with " + |
| s"correlation id $correlationId from controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + |
| s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) since " + |
| s"the replica for the partition is offline due to disk error $e") |
| val dirOpt = getLogDir(partition.topicPartition) |
| error(s"Error while making broker the leader for partition $partition in dir $dirOpt", e) |
| responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) |
| } |
| } |
| |
| } catch { |
| case e: Throwable => |
| partitionState.keys.foreach { partition => |
| stateChangeLogger.error(s"Error while processing LeaderAndIsr request correlationId $correlationId received " + |
| s"from controller $controllerId epoch $epoch for partition ${partition.topicPartition}", e) |
| } |
| // Re-throw the exception for it to be caught in KafkaApis |
| throw e |
| } |
| |
| partitionState.keys.foreach { partition => |
| stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + |
| s"epoch $epoch for the become-leader transition for partition ${partition.topicPartition}") |
| } |
| |
| partitionsToMakeLeaders |
| } |
| |
| /* |
| * Make the current broker to become follower for a given set of partitions by: |
| * |
| * 1. Remove these partitions from the leader partitions set. |
| * 2. Mark the replicas as followers so that no more data can be added from the producer clients. |
| * 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads. |
| * 4. Truncate the log and checkpoint offsets for these partitions. |
| * 5. Clear the produce and fetch requests in the purgatory |
| * 6. If the broker is not shutting down, add the fetcher to the new leaders. |
| * |
| * The ordering of doing these steps make sure that the replicas in transition will not |
| * take any more messages before checkpointing offsets so that all messages before the checkpoint |
| * are guaranteed to be flushed to disks |
| * |
| * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where |
| * the error message will be set on each partition since we do not know which partition caused it. Otherwise, |
| * return the set of partitions that are made follower due to this method |
| */ |
| private def makeFollowers(controllerId: Int, |
| epoch: Int, |
| partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState], |
| correlationId: Int, |
| responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = { |
| partitionStates.foreach { case (partition, partitionState) => |
| stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + |
| s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + |
| s"${partitionState.basePartitionState.leader}") |
| } |
| |
| for (partition <- partitionStates.keys) |
| responseMap.put(partition.topicPartition, Errors.NONE) |
| |
| val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() |
| |
| try { |
| // TODO: Delete leaders from LeaderAndIsrRequest |
| partitionStates.foreach { case (partition, partitionStateInfo) => |
| val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader |
| try { |
| metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { |
| // Only change partition state when the leader is available |
| case Some(_) => |
| if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) |
| partitionsToMakeFollower += partition |
| else |
| stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " + |
| s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " + |
| s"for partition ${partition.topicPartition} (last update " + |
| s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + |
| s"since the new leader $newLeaderBrokerId is the same as the old leader") |
| case None => |
| // The leader broker should always be present in the metadata cache. |
| // If not, we should record the error message and abort the transition process for this partition |
| stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + |
| s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + |
| s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + |
| s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") |
| // Create the local replica even if the leader is unavailable. This is required to ensure that we include |
| // the partition's high watermark in the checkpoint file (see KAFKA-1647) |
| partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew) |
| } |
| } catch { |
| case e: KafkaStorageException => |
| stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " + |
| s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + |
| s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) with leader " + |
| s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e") |
| val dirOpt = getLogDir(partition.topicPartition) |
| error(s"Error while making broker the follower for partition $partition with leader " + |
| s"$newLeaderBrokerId in dir $dirOpt", e) |
| responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) |
| } |
| } |
| |
| replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) |
| partitionsToMakeFollower.foreach { partition => |
| stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " + |
| s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " + |
| s"${partitionStates(partition).basePartitionState.leader}") |
| } |
| |
| partitionsToMakeFollower.foreach { partition => |
| val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) |
| tryCompleteDelayedProduce(topicPartitionOperationKey) |
| tryCompleteDelayedFetch(topicPartitionOperationKey) |
| } |
| |
| partitionsToMakeFollower.foreach { partition => |
| stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " + |
| s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " + |
| s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}") |
| } |
| |
| if (isShuttingDown.get()) { |
| partitionsToMakeFollower.foreach { partition => |
| stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " + |
| s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " + |
| s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader} " + |
| "since it is shutting down") |
| } |
| } |
| else { |
| // we do not need to check if the leader exists again since this has been done at the beginning of this process |
| val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => |
| val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get |
| .brokerEndPoint(config.interBrokerListenerName) |
| val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset |
| partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset) |
| }.toMap |
| replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) |
| |
| partitionsToMakeFollower.foreach { partition => |
| stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " + |
| s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " + |
| s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}") |
| } |
| } |
| } catch { |
| case e: Throwable => |
| stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " + |
| s"received from controller $controllerId epoch $epoch", e) |
| // Re-throw the exception for it to be caught in KafkaApis |
| throw e |
| } |
| |
| partitionStates.keys.foreach { partition => |
| stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + |
| s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition} with leader " + |
| s"${partitionStates(partition).basePartitionState.leader}") |
| } |
| |
| partitionsToMakeFollower |
| } |
| |
| private def maybeShrinkIsr(): Unit = { |
| trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") |
| nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) |
| } |
| |
| /** |
| * Update the follower's fetch state in the leader based on the last fetch request and update `readResult`, |
| * if the follower replica is not recognized to be one of the assigned replicas. Do not update |
| * `readResult` otherwise, so that log start/end offset and high watermark is consistent with |
| * records in fetch response. Log start/end offset and high watermark may change not only due to |
| * this fetch request, e.g., rolling new log segment and removing old log segment may move log |
| * start offset further than the last offset in the fetched records. The followers will get the |
| * updated leader's state in the next fetch response. |
| */ |
| private def updateFollowerLogReadResults(replicaId: Int, |
| readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { |
| debug(s"Recording follower broker $replicaId log end offsets: $readResults") |
| readResults.map { case (topicPartition, readResult) => |
| var updatedReadResult = readResult |
| nonOfflinePartition(topicPartition) match { |
| case Some(partition) => |
| partition.getReplica(replicaId) match { |
| case Some(replica) => |
| partition.updateReplicaLogReadResult(replica, readResult) |
| case None => |
| warn(s"Leader $localBrokerId failed to record follower $replicaId's position " + |
| s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " + |
| s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} " + |
| s"for partition $topicPartition. Empty records will be returned for this partition.") |
| updatedReadResult = readResult.withEmptyFetchInfo |
| } |
| case None => |
| warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.") |
| } |
| topicPartition -> updatedReadResult |
| } |
| } |
| |
| private def leaderPartitionsIterator: Iterator[Partition] = |
| nonOfflinePartitionsIterator.filter(_.leaderReplicaIfLocal.isDefined) |
| |
| def getLogEndOffset(topicPartition: TopicPartition): Option[Long] = |
| nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset)) |
| |
| // Flushes the highwatermark value for all partitions to the highwatermark file |
| def checkpointHighWatermarks() { |
| val replicas = nonOfflinePartitionsIterator.flatMap { partition => |
| val replicasList: mutable.Set[Replica] = mutable.Set() |
| partition.localReplica.foreach(replicasList.add) |
| partition.futureLocalReplica.foreach(replicasList.add) |
| replicasList |
| }.filter(_.log.isDefined).toBuffer |
| val replicasByDir = replicas.groupBy(_.log.get.dir.getParent) |
| for ((dir, reps) <- replicasByDir) { |
| val hwms = reps.map(r => r.topicPartition -> r.highWatermark.messageOffset).toMap |
| try { |
| highWatermarkCheckpoints.get(dir).foreach(_.write(hwms)) |
| } catch { |
| case e: KafkaStorageException => |
| error(s"Error while writing to highwatermark file in directory $dir", e) |
| } |
| } |
| } |
| |
| // Used only by test |
| def markPartitionOffline(tp: TopicPartition) { |
| allPartitions.put(tp, ReplicaManager.OfflinePartition) |
| } |
| |
| // logDir should be an absolute path |
| // sendZkNotification is needed for unit test |
| def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true) { |
| if (!logManager.isLogDirOnline(dir)) |
| return |
| info(s"Stopping serving replicas in dir $dir") |
| replicaStateChangeLock synchronized { |
| val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition => |
| partition.localReplica.exists { replica => |
| replica.log.isDefined && replica.log.get.dir.getParent == dir |
| } |
| }.map(_.topicPartition).toSet |
| |
| val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition => |
| partition.futureLocalReplica.exists { replica => |
| replica.log.isDefined && replica.log.get.dir.getParent == dir |
| } |
| }.toSet |
| |
| replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions) |
| replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition)) |
| |
| partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false)) |
| newOfflinePartitions.foreach { topicPartition => |
| val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) |
| partition.removePartitionMetrics() |
| } |
| newOfflinePartitions.map(_.topic).foreach { topic: String => |
| val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic) |
| if (!topicHasPartitions) |
| brokerTopicStats.removeMetrics(topic) |
| } |
| highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir) |
| |
| info(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and stopped moving logs " + |
| s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") |
| } |
| logManager.handleLogDirFailure(dir) |
| |
| if (sendZkNotification) |
| zkClient.propagateLogDirEvent(localBrokerId) |
| info(s"Stopped serving replicas in dir $dir") |
| } |
| |
| def removeMetrics() { |
| removeMetric("LeaderCount") |
| removeMetric("PartitionCount") |
| removeMetric("OfflineReplicaCount") |
| removeMetric("UnderReplicatedPartitions") |
| removeMetric("UnderMinIsrPartitionCount") |
| } |
| |
| // High watermark do not need to be checkpointed only when under unit tests |
| def shutdown(checkpointHW: Boolean = true) { |
| info("Shutting down") |
| removeMetrics() |
| if (logDirFailureHandler != null) |
| logDirFailureHandler.shutdown() |
| replicaFetcherManager.shutdown() |
| replicaAlterLogDirsManager.shutdown() |
| delayedFetchPurgatory.shutdown() |
| delayedProducePurgatory.shutdown() |
| delayedDeleteRecordsPurgatory.shutdown() |
| if (checkpointHW) |
| checkpointHighWatermarks() |
| info("Shut down completely") |
| } |
| |
| protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = { |
| new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) |
| } |
| |
| protected def createReplicaAlterLogDirsManager(quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = { |
| new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats) |
| } |
| |
| def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = { |
| requestedEpochInfo.map { case (tp, partitionData) => |
| val epochEndOffset = getPartition(tp) match { |
| case Some(partition) => |
| if (partition eq ReplicaManager.OfflinePartition) |
| new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) |
| else |
| partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, partitionData.leaderEpoch, |
| fetchOnlyFromLeader = true) |
| |
| case None if metadataCache.contains(tp) => |
| new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) |
| |
| case None => |
| new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) |
| } |
| tp -> epochEndOffset |
| } |
| } |
| } |