| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.log |
| |
| import java.io.File |
| import java.util.concurrent.TimeUnit |
| import java.util.concurrent.locks.ReentrantLock |
| |
| import com.yammer.metrics.core.Gauge |
| import kafka.common.{LogCleaningAbortedException, TopicAndPartition} |
| import kafka.metrics.KafkaMetricsGroup |
| import kafka.server.OffsetCheckpoint |
| import kafka.utils.CoreUtils._ |
| import kafka.utils.{Logging, Pool, Time} |
| |
| import scala.collection.{immutable, mutable} |
| |
| private[log] sealed trait LogCleaningState |
| private[log] case object LogCleaningInProgress extends LogCleaningState |
| private[log] case object LogCleaningAborted extends LogCleaningState |
| private[log] case object LogCleaningPaused extends LogCleaningState |
| |
| /** |
| * Manage the state of each partition being cleaned. |
| * If a partition is to be cleaned, it enters the LogCleaningInProgress state. |
| * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters |
| * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state. |
| * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is |
| * requested to be resumed. |
| */ |
| private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup { |
| |
| import LogCleanerManager._ |
| |
| override val loggerName = classOf[LogCleaner].getName |
| |
| // package-private for testing |
| private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint" |
| |
| /* the offset checkpoints holding the last cleaned point for each log */ |
| private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap |
| |
| /* the set of logs currently being cleaned */ |
| private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]() |
| |
| /* a global lock used to control all access to the in-progress set and the offset checkpoints */ |
| private val lock = new ReentrantLock |
| |
| /* for coordinating the pausing and the cleaning of a partition */ |
| private val pausedCleaningCond = lock.newCondition() |
| |
| /* a gauge for tracking the cleanable ratio of the dirtiest log */ |
| @volatile private var dirtiestLogCleanableRatio = 0.0 |
| newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) |
| |
| /** |
| * @return the position processed for all logs. |
| */ |
| def allCleanerCheckpoints: Map[TopicAndPartition, Long] = |
| checkpoints.values.flatMap(_.read()).toMap |
| |
| /** |
| * Choose the log to clean next and add it to the in-progress set. We recompute this |
| * each time from the full set of logs to allow logs to be dynamically added to the pool of logs |
| * the log manager maintains. |
| */ |
| def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = { |
| inLock(lock) { |
| val now = time.milliseconds |
| val lastClean = allCleanerCheckpoints |
| val dirtyLogs = logs.filter { |
| case (_, log) => log.config.compact // match logs that are marked as compacted |
| }.filterNot { |
| case (topicAndPartition, _) => inProgress.contains(topicAndPartition) // skip any logs already in-progress |
| }.map { |
| case (topicAndPartition, log) => // create a LogToClean instance for each |
| val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicAndPartition, |
| lastClean, now) |
| LogToClean(topicAndPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset) |
| }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs |
| |
| this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 |
| // and must meet the minimum threshold for dirty byte ratio |
| val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio) |
| if(cleanableLogs.isEmpty) { |
| None |
| } else { |
| val filthiest = cleanableLogs.max |
| inProgress.put(filthiest.topicPartition, LogCleaningInProgress) |
| Some(filthiest) |
| } |
| } |
| } |
| |
| /** |
| * Find any logs that have compact and delete enabled |
| */ |
| def deletableLogs(): Iterable[(TopicAndPartition, Log)] = { |
| inLock(lock) { |
| val toClean = logs.filterNot { |
| case (topicAndPartition, log) => inProgress.contains(topicAndPartition) |
| }.filter { |
| case (topicAndPartition, log) => isCompactAndDelete(log) |
| } |
| toClean.foreach{x => inProgress.put(x._1, LogCleaningInProgress)} |
| toClean |
| } |
| |
| } |
| |
| /** |
| * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of |
| * the partition is aborted. |
| * This is implemented by first abortAndPausing and then resuming the cleaning of the partition. |
| */ |
| def abortCleaning(topicAndPartition: TopicAndPartition) { |
| inLock(lock) { |
| abortAndPauseCleaning(topicAndPartition) |
| resumeCleaning(topicAndPartition) |
| } |
| info(s"The cleaning for partition $topicAndPartition is aborted") |
| } |
| |
| /** |
| * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. |
| * This call blocks until the cleaning of the partition is aborted and paused. |
| * 1. If the partition is not in progress, mark it as paused. |
| * 2. Otherwise, first mark the state of the partition as aborted. |
| * 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it |
| * throws a LogCleaningAbortedException to stop the cleaning task. |
| * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused. |
| * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused. |
| */ |
| def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) { |
| inLock(lock) { |
| inProgress.get(topicAndPartition) match { |
| case None => |
| inProgress.put(topicAndPartition, LogCleaningPaused) |
| case Some(state) => |
| state match { |
| case LogCleaningInProgress => |
| inProgress.put(topicAndPartition, LogCleaningAborted) |
| case s => |
| throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be aborted and paused since it is in $s state.") |
| } |
| } |
| while (!isCleaningInState(topicAndPartition, LogCleaningPaused)) |
| pausedCleaningCond.await(100, TimeUnit.MILLISECONDS) |
| } |
| info(s"The cleaning for partition $topicAndPartition is aborted and paused") |
| } |
| |
| /** |
| * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. |
| */ |
| def resumeCleaning(topicAndPartition: TopicAndPartition) { |
| inLock(lock) { |
| inProgress.get(topicAndPartition) match { |
| case None => |
| throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is not paused.") |
| case Some(state) => |
| state match { |
| case LogCleaningPaused => |
| inProgress.remove(topicAndPartition) |
| case s => |
| throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is in $s state.") |
| } |
| } |
| } |
| info(s"Compaction for partition $topicAndPartition is resumed") |
| } |
| |
| /** |
| * Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call. |
| */ |
| private def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = { |
| inProgress.get(topicAndPartition) match { |
| case None => false |
| case Some(state) => |
| if (state == expectedState) |
| true |
| else |
| false |
| } |
| } |
| |
| /** |
| * Check if the cleaning for a partition is aborted. If so, throw an exception. |
| */ |
| def checkCleaningAborted(topicAndPartition: TopicAndPartition) { |
| inLock(lock) { |
| if (isCleaningInState(topicAndPartition, LogCleaningAborted)) |
| throw new LogCleaningAbortedException() |
| } |
| } |
| |
| def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { |
| inLock(lock) { |
| val checkpoint = checkpoints(dataDir) |
| val existing = checkpoint.read().filterKeys(logs.keys) ++ update |
| checkpoint.write(existing) |
| } |
| } |
| |
| def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) { |
| inLock(lock) { |
| if (logs.get(topicAndPartition).config.compact) { |
| val checkpoint = checkpoints(dataDir) |
| val existing = checkpoint.read() |
| |
| if (existing.getOrElse(topicAndPartition, 0L) > offset) |
| checkpoint.write(existing + (topicAndPartition -> offset)) |
| } |
| } |
| } |
| |
| /** |
| * Save out the endOffset and remove the given log from the in-progress set, if not aborted. |
| */ |
| def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) { |
| inLock(lock) { |
| inProgress(topicAndPartition) match { |
| case LogCleaningInProgress => |
| updateCheckpoints(dataDir,Option(topicAndPartition, endOffset)) |
| inProgress.remove(topicAndPartition) |
| case LogCleaningAborted => |
| inProgress.put(topicAndPartition, LogCleaningPaused) |
| pausedCleaningCond.signalAll() |
| case s => |
| throw new IllegalStateException(s"In-progress partition $topicAndPartition cannot be in $s state.") |
| } |
| } |
| } |
| |
| def doneDeleting(topicAndPartition: TopicAndPartition): Unit = { |
| inLock(lock) { |
| inProgress.remove(topicAndPartition) |
| } |
| } |
| } |
| |
| private[log] object LogCleanerManager extends Logging { |
| |
| def isCompactAndDelete(log: Log): Boolean = { |
| log.config.compact && log.config.delete |
| } |
| |
| |
| /** |
| * Returns the range of dirty offsets that can be cleaned. |
| * |
| * @param log the log |
| * @param lastClean the map of checkpointed offsets |
| * @param now the current time in milliseconds of the cleaning operation |
| * @return the lower (inclusive) and upper (exclusive) offsets |
| */ |
| def cleanableOffsets(log: Log, topicAndPartition: TopicAndPartition, lastClean: immutable.Map[TopicAndPartition, Long], now: Long): (Long, Long) = { |
| |
| // the checkpointed offset, ie., the first offset of the next dirty segment |
| val lastCleanOffset: Option[Long] = lastClean.get(topicAndPartition) |
| |
| // If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid; |
| // reset to the log starting offset and log the error |
| val logStartOffset = log.logSegments.head.baseOffset |
| val firstDirtyOffset = { |
| val offset = lastCleanOffset.getOrElse(logStartOffset) |
| if (offset < logStartOffset) { |
| // don't bother with the warning if compact and delete are enabled. |
| if (!isCompactAndDelete(log)) |
| warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.") |
| logStartOffset |
| } else { |
| offset |
| } |
| } |
| |
| // dirty log segments |
| val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset).toArray |
| |
| val compactionLagMs = math.max(log.config.compactionLagMs, 0L) |
| |
| // find first segment that cannot be cleaned |
| // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time |
| // may be cleaned |
| val firstUncleanableDirtyOffset: Long = Seq ( |
| |
| // the active segment is always uncleanable |
| Option(log.activeSegment.baseOffset), |
| |
| // the first segment whose largest message timestamp is within a minimum time lag from now |
| if (compactionLagMs > 0) { |
| dirtyNonActiveSegments.find { |
| s => |
| val isUncleanable = s.largestTimestamp > now - compactionLagMs |
| debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable") |
| isUncleanable |
| } map(_.baseOffset) |
| } else None |
| ).flatten.min |
| |
| debug(s"Finding range of cleanable offsets for log=${log.name} topicAndPartition=$topicAndPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}") |
| |
| (firstDirtyOffset, firstUncleanableDirtyOffset) |
| } |
| } |