package kafka.log
import kafka.api.KAFKA_0_10_0_IV0
import kafka.utils._
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
import{File, IOException}
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
import java.text.NumberFormat
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ListOffsetRequest
import scala.collection.Seq
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.{Time, Utils}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import org.apache.kafka.common.TopicPartition
import java.util.regex.Pattern
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Record.NO_TIMESTAMP, -1L, Record.NO_TIMESTAMP,
NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
* Struct to hold various quantities we compute about each message set before appending to the log
* @param firstOffset The first offset in the message set
* @param lastOffset The last offset in the message set
* @param maxTimestamp The maximum timestamp of the message set.
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param sourceCodec The source codec used in the message set (send by the producer)
* @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
case class LogAppendInfo(var firstOffset: Long,
var lastOffset: Long,
var maxTimestamp: Long,
var offsetOfMaxTimestamp: Long,
var logAppendTime: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
shallowCount: Int,
validBytes: Int,
offsetsMonotonic: Boolean)
* An append-only log for storing messages.
* The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
* New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment.
* @param dir The directory in which log segments are created.
* @param config The log configuration settings
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
/* A lock that guards all modifications to the log */
private val lock = new Object
/* last time it was flushed */
private val lastflushedTime = new AtomicLong(time.milliseconds)
def initFileSize() : Int = {
if (config.preallocate)
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
locally {
val startMs = time.milliseconds
/* Calculate the offset of the next message */
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
.format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
new Gauge[Int] {
def value = numberOfSegments
new Gauge[Long] {
def value = logStartOffset
new Gauge[Long] {
def value = logEndOffset
new Gauge[Long] {
def value = size
/** The name of this log */
def name = dir.getName()
/* Load the log segments from the log files on disk */
private def loadSegments() {
// create the log directory if it doesn't exist
var swapFiles = Set[File]()
// first do a pass through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
for(file <- dir.listFiles if file.isFile) {
throw new IOException("Could not read file " + file)
val filename = file.getName
if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
// if the file ends in .deleted or .cleaned, delete it
} else if(filename.endsWith(SwapFileSuffix)) {
// we crashed in the middle of a swap operation, to recover:
// if a log, delete the .index file, complete the swap operation later
// if an index just delete it, it will be rebuilt
val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
if(baseName.getPath.endsWith(IndexFileSuffix)) {
} else if(baseName.getPath.endsWith(LogFileSuffix)){
// delete the index
val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
swapFiles += file
// now do a second pass and load all the .log and all index files
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
// if it is an index file, make sure it has a corresponding .log file
val logFile =
if (filename.endsWith(TimeIndexFileSuffix))
new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
if(!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
} else if(filename.endsWith(LogFileSuffix)) {
// if its a log file, load the corresponding log segment
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val indexFile = Log.indexFilename(dir, start)
val timeIndexFile = Log.timeIndexFilename(dir, start)
val indexFileExists = indexFile.exists()
val timeIndexFileExists = timeIndexFile.exists()
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = true)
if (indexFileExists) {
try {
// Resize the time index file to 0 if it is newly created.
if (!timeIndexFileExists)
} catch {
case e: java.lang.IllegalArgumentException =>
warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
s"${indexFile.getAbsolutePath} and rebuilding index...")
} else {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segments.put(start, segment)
// Finally, complete any interrupted swap operations. To be crash-safe,
// log files that are replaced by the swap segment should be renamed to .deleted
// before the swap file is restored as the new segment file.
for (swapFile <- swapFiles) {
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
val fileName = logFile.getName
val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val swapSegment = new LogSegment(,
index = index,
timeIndex = timeIndex,
baseOffset = startOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time = time)
info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath))
val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset)
replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
if(logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at offset 0
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize(),
preallocate = config.preallocate))
} else {
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
// reset the index size of the currently active log segment to allow more entries
private def updateLogEndOffset(messageOffset: Long) {
nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
private def recoverLog() {
// if we have the clean shutdown marker, skip recovery
if(hasCleanShutdownFile) {
this.recoveryPoint = activeSegment.nextOffset
// okay we need to actually recovery this log
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while(unflushed.hasNext) {
val curr =
info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
val truncatedBytes =
try {
} catch {
case _: InvalidOffsetException =>
val startOffset = curr.baseOffset
warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
"creating an empty one with starting offset " + startOffset)
if(truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
* Check if we have the "clean shutdown" file
private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists()
* The number of segments in the log.
* Take care! this is an O(n) operation.
def numberOfSegments: Int = segments.size
* Close this log
def close() {
debug("Closing log " + name)
lock synchronized {
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
* @param records The log records to append
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateRecords(records)
// if we have any valid messages, append them to the log
if (appendInfo.shallowCount == 0)
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validRecords = trimInvalidBytes(records, appendInfo)
try {
// they are valid, insert them in the log
lock synchronized {
if (assignOffsets) {
// assign offsets to the message set
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = offset.value
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
for (logEntry <- validRecords.shallowEntries.asScala) {
if (logEntry.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(logEntry.sizeInBytes, config.maxMessageSize))
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " +
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validRecords.sizeInBytes, config.segmentSize))
// maybe roll the log if this segment is full
val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
maxTimestampInMessages = appendInfo.maxTimestamp,
maxOffsetInMessages = appendInfo.lastOffset)
// now append to the log
segment.append(firstOffset = appendInfo.firstOffset,
largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
if (unflushedMessages >= config.flushInterval)
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
* Validate the following:
* <ol>
* <li> each message matches its CRC
* <li> each message size is valid
* </ol>
* Also compute the following quantities:
* <ol>
* <li> First offset in the message set
* <li> Last offset in the message set
* <li> Number of messages
* <li> Number of valid bytes
* <li> Whether the offsets are monotonically increasing
* <li> Whether any compression codec is used (if many are used, then the last one is given)
* </ol>
private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset = -1L
var lastOffset = -1L
var sourceCodec: CompressionCodec = NoCompressionCodec
var monotonic = true
var maxTimestamp = Record.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
for (entry <- records.shallowEntries.asScala) {
// update the first offset if on the first message
if(firstOffset < 0)
firstOffset = entry.offset
// check that offsets are monotonically increasing
if(lastOffset >= entry.offset)
monotonic = false
// update the last offset seen
lastOffset = entry.offset
val record = entry.record
// Check if the message sizes are valid.
val messageSize = entry.sizeInBytes
if(messageSize > config.maxMessageSize) {
throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(messageSize, config.maxMessageSize))
// check the validity of the message by checking CRC
if (record.timestamp > maxTimestamp) {
maxTimestamp = record.timestamp
offsetOfMaxTimestamp = lastOffset
shallowMessageCount += 1
validBytesCount += messageSize
val messageCodec = CompressionCodec.getCompressionCodec(
if (messageCodec != NoCompressionCodec)
sourceCodec = messageCodec
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Record.NO_TIMESTAMP, sourceCodec,
targetCodec, shallowMessageCount, validBytesCount, monotonic)
* Trim any invalid bytes from the end of this message set (if there are any)
* @param records The records to trim
* @param info The general information of the message set
* @return A trimmed message set. This may be the same as what was passed in or it may not.
private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = {
val validBytes = info.validBytes
if (validBytes < 0)
throw new CorruptRecordException("Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
if (validBytes == records.sizeInBytes) {
} else {
// trim invalid bytes
val validByteBuffer = records.buffer.duplicate()
* Read messages from the log.
* @param startOffset The offset to begin reading at
* @param maxLength The maximum number of bytes to read
* @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
* @return The fetch data information including fetch starting offset metadata and messages read.
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// Because we don't use lock for reading, the synchronization is a little bit tricky.
// We create the local variables to avoid race conditions with updates to the log.
val currentNextOffsetMetadata = nextOffsetMetadata
val next = currentNextOffsetMetadata.messageOffset
if(startOffset == next)
return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)
var entry = segments.floorEntry(startOffset)
// attempt to read beyond the log end offset is an error
if(startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while(entry != null) {
// If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
// the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
// cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
// end of the active segment.
val maxPosition = {
if (entry == segments.lastEntry) {
val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
// Check the segment again in case a new segment has just rolled out.
if (entry != segments.lastEntry)
// New log segment has rolled out, we can read up to the file end.
} else {
val fetchInfo =, maxOffset, maxLength, maxPosition, minOneMessage)
if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
* Get an offset based on the given timestamp
* The offset returned is the offset of the first message whose timestamp is greater than or equals to the
* given timestamp.
* If no such message is found, the log end offset is returned.
* `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before
* , i.e. it only gives back the timestamp based on the last modification time of the log segments.
* @param targetTimestamp The given timestamp for offset fetching.
* @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
* None if no such message is found.
def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = {
debug(s"Searching offset for timestamp $targetTimestamp")
if (config.messageFormatVersion < KAFKA_0_10_0_IV0 &&
targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
s"required version $KAFKA_0_10_0_IV0")
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segmentsCopy = logSegments.toBuffer
// For the earliest and latest, we do not need to return the timestamp.
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset))
val targetSeg = {
// Get all the segments whose largest timestamp is smaller than target timestamp
val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
// We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
if (earlierSegs.length < segmentsCopy.length)
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, return unknown offset metadata
def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
try {
val fetchDataInfo = read(offset, 1)
} catch {
case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
* Delete any log segments matching the given predicate function,
* starting with the oldest segment and moving forward until a segment doesn't match.
* @param predicate A function that takes in a single log segment and returns true iff it is deletable
* @return The number of segments deleted
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
val numToDelete = deletable.size
if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if (segments.size == numToDelete)
// remove the segments for lookups
* Find segments starting from the oldest until the the user-supplied predicate is false.
* A final segment that is empty will never be returned (since we would just end up re-creating it).
* @param predicate A function that takes in a single log segment and returns true iff it is deletable
* @return the segments ready to be deleted
private def deletableSegments(predicate: LogSegment => Boolean) = {
val lastEntry = segments.lastEntry
if (lastEntry == null) Seq.empty
else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
* Delete any log segments that have either expired due to time based retention
* or because the log size is > retentionSize
def deleteOldSegments(): Int = {
if (!config.delete) return 0
deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
private def deleteRetenionMsBreachedSegments() : Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
private def deleteRetentionSizeBreachedSegments() : Int = {
if (config.retentionSize < 0 || size < config.retentionSize) return 0
var diff = size - config.retentionSize
def shouldDelete(segment: LogSegment) = {
if (diff - segment.size >= 0) {
diff -= segment.size
} else {
* The size of the log in bytes
def size: Long =
* The earliest message offset in the log
def logStartOffset: Long = logSegments.head.baseOffset
* The offset metadata of the next message that will be appended to the log
def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
* The offset of the next message that will be appended to the log
def logEndOffset: Long = nextOffsetMetadata.messageOffset
* Roll the log over to a new empty log segment if necessary.
* @param messagesSize The messages set size in bytes
* @param maxTimestampInMessages The maximum timestamp in the messages.
* logSegment will be rolled if one of the following conditions met
* <ol>
* <li> The logSegment is full
* <li> The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if
* the first message does not have a timestamp)
* <li> The index is full
* </ol>
* @return The currently active segment after (perhaps) rolling to a new segment
private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
val segment = activeSegment
val now = time.milliseconds
val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
if (segment.size > config.segmentSize - messagesSize ||
(segment.size > 0 && reachedRollMs) ||
segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) {
debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
maxOffsetInMessages - Integer.MAX_VALUE is a heuristic value for the first offset in the set of messages.
Since the offset in messages will not differ by more than Integer.MAX_VALUE, this is guaranteed <= the real
first offset in the set. Determining the true first offset in the set requires decompression, which the follower
is trying to avoid during log append. Prior behavior assigned new baseOffset = logEndOffset from old segment.
This was problematic in the case that two consecutive messages differed in offset by
Integer.MAX_VALUE.toLong + 2 or more. In this case, the prior behavior would roll a new log segment whose
base offset was too low to contain the next message. This edge case is possible when a replica is recovering a
highly compacted topic from scratch.
roll(maxOffsetInMessages - Integer.MAX_VALUE)
} else {
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
* @return The newly rolled segment
def roll(expectedNextOffset: Long = 0): LogSegment = {
val start = time.nanoseconds
lock synchronized {
val newOffset = Math.max(expectedNextOffset, logEndOffset)
val logFile = Log.logFile(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
val timeIndexFile = timeIndexFilename(dir, newOffset)
for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
segments.lastEntry() match {
case null =>
case entry => {
val seg = entry.getValue
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = initFileSize,
preallocate = config.preallocate)
val prev = addSegment(segment)
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
// We need to update the segment base offset and append position data of the metadata when log rolls.
// The next offset should not change.
// schedule an asynchronous flush of the old segment
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
* The number of messages appended to the log since the last flush
def unflushedMessages() = this.logEndOffset - this.recoveryPoint
* Flush all log segments
def flush(): Unit = flush(this.logEndOffset)
* Flush log segments for all offsets up to offset-1
* @param offset The offset to flush up to (non-inclusive); the new recovery point
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint)
debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
time.milliseconds + " unflushed = " + unflushedMessages)
for(segment <- logSegments(this.recoveryPoint, offset))
lock synchronized {
if(offset > this.recoveryPoint) {
this.recoveryPoint = offset
* Completely delete this log directory and all contents from the file system with no delay
private[log] def delete() {
lock synchronized {
* Truncate this log so that it ends with the greatest offset < targetOffset.
* @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
private[log] def truncateTo(targetOffset: Long) {
info("Truncating log %s to offset %d.".format(name, targetOffset))
if(targetOffset < 0)
throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
if(targetOffset > logEndOffset) {
info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
lock synchronized {
if(segments.firstEntry.getValue.baseOffset > targetOffset) {
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
* Delete all data in the log and start at the new offset
* @param newOffset The new offset to start the log with
private[log] def truncateFullyAndStartAt(newOffset: Long) {
debug("Truncate and start log '" + name + "' to " + newOffset)
lock synchronized {
val segmentsToDelete = logSegments.toList
addSegment(new LogSegment(dir,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = initFileSize,
preallocate = config.preallocate))
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
* The time this log is last known to have been fully flushed to disk
def lastFlushTime(): Long = lastflushedTime.get
* The active segment that is currently taking appends
def activeSegment = segments.lastEntry.getValue
* All the log segments in this log ordered from oldest to newest
def logSegments: Iterable[LogSegment] = segments.values.asScala
* Get all segments beginning with the segment that includes "from" and ending with the segment
* that includes up to "to-1" or the end of the log (if to > logEndOffset)
def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
lock synchronized {
val floor = segments.floorKey(from)
if(floor eq null)
segments.subMap(floor, true, to, false).values.asScala
override def toString = "Log(" + dir + ")"
* This method performs an asynchronous log segment delete by doing the following:
* <ol>
* <li>It removes the segment from the segment map so that it will no longer be used for reads.
* <li>It renames the index and log files by appending .deleted to the respective file name
* <li>It schedules an asynchronous delete operation to occur in the future
* </ol>
* This allows reads to happen concurrently without synchronization and without the possibility of physically
* deleting a file while it is being read from.
* @param segment The log segment to schedule for deletion
private def deleteSegment(segment: LogSegment) {
info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
lock synchronized {
* Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
* @throws KafkaStorageException if the file can't be renamed and still exists
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
* Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will
* be asynchronously deleted.
* The sequence of operations is:
* <ol>
* <li> Cleaner creates new segment with suffix .cleaned and invokes replaceSegments().
* If broker crashes at this point, the clean-and-swap operation is aborted and
* the .cleaned file is deleted on recovery in loadSegments().
* <li> New segment is renamed .swap. If the broker crashes after this point before the whole
* operation is completed, the swap operation is resumed on recovery as described in the next step.
* <li> Old segment files are renamed to .deleted and asynchronous delete is scheduled.
* If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments().
* replaceSegments() is then invoked to complete the swap with newSegment recreated from
* the .swap file and oldSegments containing segments which were not renamed before the crash.
* <li> Swap segment is renamed to replace the existing segment, completing this operation.
* If the broker crashes, any .deleted files which may be left behind are deleted
* on recovery in loadSegments().
* </ol>
* @param newSegment The new log segment to add to the log
* @param oldSegments The old log segments to delete from the log
* @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash
private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile : Boolean = false) {
lock synchronized {
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
// delete the old files
for(seg <- oldSegments) {
// remove the index entry
if(seg.baseOffset != newSegment.baseOffset)
// delete segment
// okay we are safe now, remove the swap suffix
newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
* remove deleted log metrics
private[log] def removeLogMetrics(): Unit = {
removeMetric("NumLogSegments", tags)
removeMetric("LogStartOffset", tags)
removeMetric("LogEndOffset", tags)
removeMetric("Size", tags)
* Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
* @param segment The segment to add
def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
* Helper functions for logs
object Log {
/** a log file */
val LogFileSuffix = ".log"
/** an index file */
val IndexFileSuffix = ".index"
/** a time index file */
val TimeIndexFileSuffix = ".timeindex"
/** a file that is scheduled to be deleted */
val DeletedFileSuffix = ".deleted"
/** A temporary file that is being used for log cleaning */
val CleanedFileSuffix = ".cleaned"
/** A temporary file used when swapping files into the log */
val SwapFileSuffix = ".swap"
/** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility
* with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
/** TODO: Get rid of CleanShutdownFile in 0.8.2 */
val CleanShutdownFile = ".kafka_cleanshutdown"
/** a directory that is scheduled to be deleted */
val DeleteDirSuffix = "-delete"
private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix")
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
* @param offset The offset to use in the file name
* @return The filename
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
* Construct a log file name in the given dir with the given base offset
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
def logFile(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
* Return a directory name to rename the log directory to for async deletion. The name will be in the following
* format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables.
def logDeleteDirName(logName: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
* Construct an index file name in the given dir using the given base offset
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
def indexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
* Construct a time index file name in the given dir using the given base offset
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
def timeIndexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
* Parse the topic and partition out of the directory name of a log
def parseTopicPartitionName(dir: File): TopicPartition = {
if (dir == null)
throw new KafkaException("dir should not be null")
def exception(dir: File): KafkaException = {
new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " +
"topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" +
"Kafka's log directories (and children) should only contain Kafka topic data.")
val dirName = dir.getName
if (dirName == null || dirName.isEmpty || !dirName.contains('-'))
throw exception(dir)
if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches)
throw exception(dir)
val name: String =
if (dirName.endsWith(DeleteDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.'))
else dirName
val index = name.lastIndexOf('-')
val topic = name.substring(0, index)
val partitionString = name.substring(index + 1)
if (topic.isEmpty || partitionString.isEmpty)
throw exception(dir)
val partition =
try partitionString.toInt
catch { case _: NumberFormatException => throw exception(dir) }
new TopicPartition(topic, partition)