blob: b3e6e72c3e3969f7eae51f4f228a88cc30d1cb22 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.OffsetCheckpoint
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool, Time}
import scala.collection.{immutable, mutable}
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case object LogCleaningPaused extends LogCleaningState
/**
* Manage the state of each partition being cleaned.
* If a partition is to be cleaned, it enters the LogCleaningInProgress state.
* While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters
* the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state.
* While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
* requested to be resumed.
*/
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
import LogCleanerManager._
override val loggerName = classOf[LogCleaner].getName
// package-private for testing
private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
/* the offset checkpoints holding the last cleaned point for each log */
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
/* the set of logs currently being cleaned */
private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
/* a global lock used to control all access to the in-progress set and the offset checkpoints */
private val lock = new ReentrantLock
/* for coordinating the pausing and the cleaning of a partition */
private val pausedCleaningCond = lock.newCondition()
/* a gauge for tracking the cleanable ratio of the dirtiest log */
@volatile private var dirtiestLogCleanableRatio = 0.0
newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
/**
* @return the position processed for all logs.
*/
def allCleanerCheckpoints: Map[TopicAndPartition, Long] =
checkpoints.values.flatMap(_.read()).toMap
/**
* Choose the log to clean next and add it to the in-progress set. We recompute this
* each time from the full set of logs to allow logs to be dynamically added to the pool of logs
* the log manager maintains.
*/
def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
inLock(lock) {
val now = time.milliseconds
val lastClean = allCleanerCheckpoints
val dirtyLogs = logs.filter {
case (_, log) => log.config.compact // match logs that are marked as compacted
}.filterNot {
case (topicAndPartition, _) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
}.map {
case (topicAndPartition, log) => // create a LogToClean instance for each
val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicAndPartition,
lastClean, now)
LogToClean(topicAndPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
None
} else {
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
/**
* Find any logs that have compact and delete enabled
*/
def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
inLock(lock) {
val toClean = logs.filterNot {
case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
}.filter {
case (topicAndPartition, log) => isCompactAndDelete(log)
}
toClean.foreach{x => inProgress.put(x._1, LogCleaningInProgress)}
toClean
}
}
/**
* Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
* the partition is aborted.
* This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
*/
def abortCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
abortAndPauseCleaning(topicAndPartition)
resumeCleaning(topicAndPartition)
}
info(s"The cleaning for partition $topicAndPartition is aborted")
}
/**
* Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and paused.
* 1. If the partition is not in progress, mark it as paused.
* 2. Otherwise, first mark the state of the partition as aborted.
* 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
* throws a LogCleaningAbortedException to stop the cleaning task.
* 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
* 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
*/
def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
inProgress.put(topicAndPartition, LogCleaningPaused)
case Some(state) =>
state match {
case LogCleaningInProgress =>
inProgress.put(topicAndPartition, LogCleaningAborted)
case s =>
throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be aborted and paused since it is in $s state.")
}
}
while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
}
info(s"The cleaning for partition $topicAndPartition is aborted and paused")
}
/**
* Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
*/
def resumeCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is not paused.")
case Some(state) =>
state match {
case LogCleaningPaused =>
inProgress.remove(topicAndPartition)
case s =>
throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is in $s state.")
}
}
}
info(s"Compaction for partition $topicAndPartition is resumed")
}
/**
* Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
*/
private def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
inProgress.get(topicAndPartition) match {
case None => false
case Some(state) =>
if (state == expectedState)
true
else
false
}
}
/**
* Check if the cleaning for a partition is aborted. If so, throw an exception.
*/
def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
inLock(lock) {
if (isCleaningInState(topicAndPartition, LogCleaningAborted))
throw new LogCleaningAbortedException()
}
}
def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read().filterKeys(logs.keys) ++ update
checkpoint.write(existing)
}
}
def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
inLock(lock) {
if (logs.get(topicAndPartition).config.compact) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read()
if (existing.getOrElse(topicAndPartition, 0L) > offset)
checkpoint.write(existing + (topicAndPartition -> offset))
}
}
}
/**
* Save out the endOffset and remove the given log from the in-progress set, if not aborted.
*/
def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
inLock(lock) {
inProgress(topicAndPartition) match {
case LogCleaningInProgress =>
updateCheckpoints(dataDir,Option(topicAndPartition, endOffset))
inProgress.remove(topicAndPartition)
case LogCleaningAborted =>
inProgress.put(topicAndPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
throw new IllegalStateException(s"In-progress partition $topicAndPartition cannot be in $s state.")
}
}
}
def doneDeleting(topicAndPartition: TopicAndPartition): Unit = {
inLock(lock) {
inProgress.remove(topicAndPartition)
}
}
}
private[log] object LogCleanerManager extends Logging {
def isCompactAndDelete(log: Log): Boolean = {
log.config.compact && log.config.delete
}
/**
* Returns the range of dirty offsets that can be cleaned.
*
* @param log the log
* @param lastClean the map of checkpointed offsets
* @param now the current time in milliseconds of the cleaning operation
* @return the lower (inclusive) and upper (exclusive) offsets
*/
def cleanableOffsets(log: Log, topicAndPartition: TopicAndPartition, lastClean: immutable.Map[TopicAndPartition, Long], now: Long): (Long, Long) = {
// the checkpointed offset, ie., the first offset of the next dirty segment
val lastCleanOffset: Option[Long] = lastClean.get(topicAndPartition)
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
val logStartOffset = log.logSegments.head.baseOffset
val firstDirtyOffset = {
val offset = lastCleanOffset.getOrElse(logStartOffset)
if (offset < logStartOffset) {
// don't bother with the warning if compact and delete are enabled.
if (!isCompactAndDelete(log))
warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.")
logStartOffset
} else {
offset
}
}
// dirty log segments
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset).toArray
val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq (
// the active segment is always uncleanable
Option(log.activeSegment.baseOffset),
// the first segment whose largest message timestamp is within a minimum time lag from now
if (compactionLagMs > 0) {
dirtyNonActiveSegments.find {
s =>
val isUncleanable = s.largestTimestamp > now - compactionLagMs
debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
isUncleanable
} map(_.baseOffset)
} else None
).flatten.min
debug(s"Finding range of cleanable offsets for log=${log.name} topicAndPartition=$topicAndPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
(firstDirtyOffset, firstUncleanableDirtyOffset)
}
}