blob: 822f879f38567d85a55d806d386cda763461cd25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io._
import kafka.utils._
import scala.actors.Actor
import scala.collection._
import java.util.concurrent.CountDownLatch
import kafka.server.{KafkaConfig, KafkaZooKeeper}
import kafka.common.{InvalidTopicException, InvalidPartitionException}
import kafka.api.OffsetRequest
/**
* The guy who creates and hands out logs
*/
@threadsafe
private[kafka] class LogManager(val config: KafkaConfig,
private val scheduler: KafkaScheduler,
private val time: Time,
val logRollDefaultIntervalMs: Long,
val logCleanupIntervalMs: Long,
val logCleanupDefaultAgeMs: Long,
needRecovery: Boolean) extends Logging {
val logDir: File = new File(config.logDir)
private val numPartitions = config.numPartitions
private val logFileSizeMap = config.logFileSizeMap
private val flushInterval = config.flushInterval
private val topicPartitionsMap = config.topicPartitionsMap
private val logCreationLock = new Object
private val random = new java.util.Random
private var kafkaZookeeper: KafkaZooKeeper = null
private var zkActor: Actor = null
private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
private val topicNameValidator = new TopicNameValidator(config)
private val logFlushIntervalMap = config.flushIntervalMap
private val logRetentionSizeMap = config.logRetentionSizeMap
private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
private val logRollMsMap = getMsMap(config.logRollHoursMap)
/* Initialize a log for each subdirectory of the main log directory */
private val logs = new Pool[String, Pool[Int, Log]]()
if(!logDir.exists()) {
info("No log directory found, creating '" + logDir.getAbsolutePath() + "'")
logDir.mkdirs()
}
if(!logDir.isDirectory() || !logDir.canRead())
throw new IllegalArgumentException(logDir.getAbsolutePath() + " is not a readable log directory.")
val subDirs = logDir.listFiles()
if(subDirs != null) {
for(dir <- subDirs) {
if(!dir.isDirectory()) {
warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
} else {
info("Loading log '" + dir.getName() + "'")
val topic = Utils.getTopicPartition(dir.getName)._1
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery)
val topicPartion = Utils.getTopicPartition(dir.getName)
logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
val parts = logs.get(topicPartion._1)
parts.put(topicPartion._2, log)
}
}
}
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("starting log cleaner every " + logCleanupIntervalMs + " ms")
scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
}
if(config.enableZookeeper) {
kafkaZookeeper = new KafkaZooKeeper(config, this)
kafkaZookeeper.startup
zkActor = new Actor {
def act() {
loop {
receive {
case topic: String =>
try {
kafkaZookeeper.registerTopicInZk(topic)
}
catch {
case e => error(e) // log it and let it go
}
case StopActor =>
info("zkActor stopped")
exit
}
}
}
}
zkActor.start
}
case object StopActor
private def getMsMap(hoursMap: Map[String, Int]) : Map[String, Long] = {
var ret = new mutable.HashMap[String, Long]
for ( (topic, hour) <- hoursMap ) {
ret.put(topic, hour * 60 * 60 * 1000L)
}
ret
}
/**
* Register this broker in ZK for the first time.
*/
def startup() {
if(config.enableZookeeper) {
kafkaZookeeper.registerBrokerInZk()
for (topic <- getAllTopics)
kafkaZookeeper.registerTopicInZk(topic)
startupLatch.countDown
}
info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
}
private def awaitStartup() {
if (config.enableZookeeper)
startupLatch.await
}
private def registerNewTopicInZK(topic: String) {
if (config.enableZookeeper)
zkActor ! topic
}
/**
* Create a log for the given topic and the given partition
*/
private def createLog(topic: String, partition: Int): Log = {
logCreationLock synchronized {
val d = new File(logDir, topic + "-" + partition)
d.mkdirs()
val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
new Log(d, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false)
}
}
/**
* Return the Pool (partitions) for a specific log
*/
private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = {
awaitStartup
topicNameValidator.validate(topic)
if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions)) {
warn("Wrong partition " + partition + " valid partitions (0," +
(topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
throw new InvalidPartitionException("wrong partition " + partition)
}
logs.get(topic)
}
/**
* Pick a random partition from the given topic
*/
def chooseRandomPartition(topic: String): Int = {
random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions))
}
def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
val log = getLog(offsetRequest.topic, offsetRequest.partition)
if (log != null) return log.getOffsetsBefore(offsetRequest)
Log.getEmptyOffsets(offsetRequest)
}
/**
* Get the log if exists
*/
def getLog(topic: String, partition: Int): Log = {
val parts = getLogPool(topic, partition)
if (parts == null) return null
parts.get(partition)
}
/**
* Create the log if it does not exist, if it exists just return it
*/
def getOrCreateLog(topic: String, partition: Int): Log = {
var hasNewTopic = false
var parts = getLogPool(topic, partition)
if (parts == null) {
val found = logs.putIfNotExists(topic, new Pool[Int, Log])
if (found == null)
hasNewTopic = true
parts = logs.get(topic)
}
var log = parts.get(partition)
if(log == null) {
log = createLog(topic, partition)
val found = parts.putIfNotExists(partition, log)
if(found != null) {
// there was already somebody there
log.close()
log = found
}
else
info("Created log for '" + topic + "'-" + partition)
}
if (hasNewTopic)
registerNewTopicInZK(topic)
log
}
/* Attemps to delete all provided segments from a log and returns how many it was able to */
private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
var total = 0
for(segment <- segments) {
info("Deleting log segment " + segment.file.getName() + " from " + log.name)
Utils.swallow(logger.warn, segment.messageSet.close())
if(!segment.file.delete()) {
warn("Delete failed.")
} else {
total += 1
}
}
total
}
/* Runs through the log removing segments older than a certain age */
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = Utils.getTopicPartition(log.dir.getName)._1
val logCleanupThresholdMS = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
val total = deleteSegments(log, toBeDeleted)
total
}
/**
* Runs through the log removing segments until the size of the log
* is at least logRetentionSize bytes in size
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
val topic = Utils.getTopicPartition(log.dir.getName)._1
val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
var diff = log.size - maxLogRetentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
val toBeDeleted = log.markDeletedWhile( shouldDelete )
val total = deleteSegments(log, toBeDeleted)
total
}
/**
* Delete any eligible logs. Return the number of segments deleted.
*/
def cleanupLogs() {
debug("Beginning log cleanup...")
val iter = getLogIterator
var total = 0
val startMs = time.milliseconds
while(iter.hasNext) {
val log = iter.next
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
/**
* Close all the logs
*/
def close() {
logFlusherScheduler.shutdown()
val iter = getLogIterator
while(iter.hasNext)
iter.next.close()
if (config.enableZookeeper) {
zkActor ! StopActor
kafkaZookeeper.close
}
}
private def getLogIterator(): Iterator[Log] = {
new IteratorTemplate[Log] {
val partsIter = logs.values.iterator
var logIter: Iterator[Log] = null
override def makeNext(): Log = {
while (true) {
if (logIter != null && logIter.hasNext)
return logIter.next
if (!partsIter.hasNext)
return allDone
logIter = partsIter.next.values.iterator
}
// should never reach here
assert(false)
return allDone
}
}
}
private def flushAllLogs() = {
debug("flushing the high watermark of all logs")
for (log <- getLogIterator)
{
try{
val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
var logFlushInterval = config.defaultFlushIntervalMs
if(logFlushIntervalMap.contains(log.getTopicName))
logFlushInterval = logFlushIntervalMap(log.getTopicName)
debug(log.getTopicName + " flush interval " + logFlushInterval +
" last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= logFlushInterval)
log.flush
}
catch {
case e =>
error("Error flushing topic " + log.getTopicName, e)
e match {
case _: IOException =>
fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
Runtime.getRuntime.halt(1)
case _ =>
}
}
}
}
def getAllTopics(): Iterator[String] = logs.keys.iterator
def getTopicPartitionsMap() = topicPartitionsMap
}