blob: 95ca8dbb9ea8b199c204df89b3a63f9d67e0f21d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io._
import java.util.concurrent.TimeUnit
import kafka.utils._
import scala.collection._
import kafka.common.{TopicAndPartition, KafkaException}
import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future}
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
* All read and write operations are delegated to the individual log instances.
*
* The log manager maintains logs in one or more directories. New logs are created in the data directory
* with the fewest logs. No attempt is made to move partitions after the fact or balance based on
* size or I/O rate.
*
* A background thread handles log retention by periodically truncating excess log segments.
*/
@threadsafe
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
// public, so we can access this from kafka.admin.DeleteTopicTest
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
new LogCleaner(cleanerConfig, logDirs, logs, time = time)
else
null
/**
* Create and check validity of the given directories, specifically:
* <ol>
* <li> Ensure that there are no duplicates in the directory list
* <li> Create each directory if it doesn't exist
* <li> Check that each path is a readable directory
* </ol>
*/
private def createAndValidateLogDirs(dirs: Seq[File]) {
if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
for(dir <- dirs) {
if(!dir.exists) {
info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
val created = dir.mkdirs()
if(!created)
throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
}
if(!dir.isDirectory || !dir.canRead)
throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
}
}
/**
* Lock all the given directories
*/
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
dirs.map { dir =>
val lock = new FileLock(new File(dir, LockFile))
if(!lock.tryLock())
throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath +
". A Kafka instance in another process or thread is using this directory.")
lock
}
}
/**
* Recover and load all logs in the given data directories
*/
private def loadLogs(): Unit = {
info("Loading logs.")
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- this.logDirs) {
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) {
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
// log recovery itself is being performed by `Log` class during initialization
brokerState.newState(RecoveringFromUncleanShutdown)
}
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
Utils.runnable {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
val previous = this.logs.put(topicPartition, current)
if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
}
try {
for ((cleanShutdownFile, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
cleanShutdownFile.delete()
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during logs loading: " + e.getCause)
throw e.getCause
}
} finally {
threadPools.foreach(_.shutdown())
}
info("Logs loading complete.")
}
/**
* Start the background threads to flush logs and do log cleanup
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
/**
* Close all the logs
*/
def shutdown() {
info("Shutting down.")
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
// stop the cleaner first
if (cleaner != null) {
Utils.swallow(cleaner.shutdown())
}
// close logs in each dir
for (dir <- this.logDirs) {
debug("Flushing and closing logs at " + dir)
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
val jobsForDir = logsInDir map { log =>
Utils.runnable {
// flush the log to ensure latest possible recovery point
log.flush()
log.close()
}
}
jobs(dir) = jobsForDir.map(pool.submit).toSeq
}
try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
// update the last flush point
debug("Updating recovery points at " + dir)
checkpointLogsInDir(dir)
// mark that the shutdown was clean by creating marker file
debug("Writing clean shutdown marker at " + dir)
Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during LogManager shutdown: " + e.getCause)
throw e.getCause
}
} finally {
threadPools.foreach(_.shutdown())
// regardless of whether the close succeeded, we need to unlock the data directories
dirLocks.foreach(_.destroy())
}
info("Shutdown complete.")
}
/**
* Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
*
* @param partitionAndOffsets Partition logs that need to be truncated
*/
def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) {
for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) {
val log = logs.get(topicAndPartition)
// If the log does not exist, skip it
if (log != null) {
//May need to abort and pause the cleaning of the log, and resume after truncation is done.
val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
if (needToStopCleaner && cleaner != null)
cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateTo(truncateOffset)
if (needToStopCleaner && cleaner != null)
cleaner.resumeCleaning(topicAndPartition)
}
}
checkpointRecoveryPointOffsets()
}
/**
* Delete all data in a partition and start the log at the new offset
* @param newOffset The new offset to start the log with
*/
def truncateFullyAndStartAt(topicAndPartition: TopicAndPartition, newOffset: Long) {
val log = logs.get(topicAndPartition)
// If the log does not exist, skip it
if (log != null) {
//Abort and pause the cleaning of the log, and resume after truncation is done.
if (cleaner != null)
cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateFullyAndStartAt(newOffset)
if (cleaner != null)
cleaner.resumeCleaning(topicAndPartition)
}
checkpointRecoveryPointOffsets()
}
/**
* Write out the current recovery point for all logs to a text file in the log directory
* to avoid recovering the whole log on startup.
*/
def checkpointRecoveryPointOffsets() {
this.logDirs.foreach(checkpointLogsInDir)
}
/**
* Make a checkpoint for all logs in provided directory.
*/
private def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}
/**
* Get the log if it exists, otherwise return None
*/
def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
val log = logs.get(topicAndPartition)
if (log == null)
None
else
Some(log)
}
/**
* Create a log for the given topic and the given partition
* If the log already exists, just return a copy of the existing log
*/
def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
var log = logs.get(topicAndPartition)
// check if the log has already been created in another thread
if(log != null)
return log
// if not, create it
val dataDir = nextLogDir()
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
log = new Log(dir,
config,
recoveryPoint = 0L,
scheduler,
time)
logs.put(topicAndPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicAndPartition.topic,
topicAndPartition.partition,
dataDir.getAbsolutePath,
{import JavaConversions._; config.toProps.mkString(", ")}))
log
}
}
/**
* Delete a log.
*/
def deleteLog(topicAndPartition: TopicAndPartition) {
var removedLog: Log = null
logCreationOrDeletionLock synchronized {
removedLog = logs.remove(topicAndPartition)
}
if (removedLog != null) {
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null) {
cleaner.abortCleaning(topicAndPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
removedLog.delete()
info("Deleted log for partition [%s,%d] in %s."
.format(topicAndPartition.topic,
topicAndPartition.partition,
removedLog.dir.getAbsolutePath))
}
}
/**
* Choose the next directory in which to create a log. Currently this is done
* by calculating the number of partitions in each directory and then choosing the
* data directory with the fewest partitions.
*/
private def nextLogDir(): File = {
if(logDirs.size == 1) {
logDirs(0)
} else {
// count the number of logs in each parent directory (including 0 for empty directories
val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
var dirCounts = (zeros ++ logCounts).toBuffer
// choose the directory with the least logs in it
val leastLoaded = dirCounts.sortBy(_._2).head
new File(leastLoaded._1)
}
}
/**
* Runs through the log removing segments older than a certain age
*/
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}
/**
* Runs through the log removing segments until the size of the log
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
return 0
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
log.deleteOldSegments(shouldDelete)
}
/**
* Delete any eligible logs. Return the number of segments deleted.
*/
def cleanupLogs() {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
/**
* Get all the partition logs
*/
def allLogs(): Iterable[Log] = logs.values
/**
* Get a map of TopicAndPartition => Log
*/
def logsByTopicPartition = logs.toMap
/**
* Map of log dir to logs by topic and partitions in that dir
*/
private def logsByDir = {
this.logsByTopicPartition.groupBy {
case (_, log) => log.dir.getParent
}
}
/**
* Flush any log which has exceeded its flush interval and has unwritten messages.
*/
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
for ((topicAndPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error("Error flushing topic " + topicAndPartition.topic, e)
}
}
}
}