blob: 9da56c185e003152f43642c358871ba4c21942de [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.log
import{File, IOException}
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.LogEntryPosition
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.math._
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
* any previous segment.
* A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
* @param log The message set containing log entries
* @param index The offset index
* @param timeIndex The timestamp index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param time The time instance
class LogSegment(val log: FileRecords,
val index: OffsetIndex,
val timeIndex: TimeIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Logging {
private var created = time.milliseconds
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
/* The timestamp we used for time based log rolling */
private var rollingBasedTimestamp: Option[Long] = None
/* The maximum timestamp we see so far */
@volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp
@volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(, startOffset), fileAlreadyExists, initFileSize, preallocate),
new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
/* Return the size in bytes of this log segment */
def size: Long = log.sizeInBytes()
* checks that the argument offset can be represented as an integer offset relative to the baseOffset.
def canConvertToRelativeOffset(offset: Long): Boolean = {
(offset - baseOffset) <= Integer.MAX_VALUE
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
* It is assumed this method is being called from within a lock.
* @param firstOffset The first offset in the message set.
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) {
if (records.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
.format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
// append the messages
require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(firstOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0
bytesSinceLastIndexEntry += records.sizeInBytes
* Find the physical file position for the first message with offset >= the requested offset.
* The startingFilePosition argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
* @return The position in the log storing the message with the least offset >= the requested offset and the size of the
* message or null if no message meets this criteria.
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
val mapping = index.lookup(offset)
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
* @param maxPosition The maximum position in the log segment that should be exposed for read
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position.toInt
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length = maxOffset match {
case None =>
// no max offset, just read until the max position
min((maxPosition - startPosition).toInt, adjustedMaxSize)
case Some(offset) =>
// there is a max offset, translate it to a file position and use that to calculate the max read size;
// when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
// true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
// offset between new leader's high watermark and the log end offset, we want to return an empty response.
if (offset < startOffset)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
val mapping = translateOffset(offset, startPosition)
val endPosition =
if (mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
FetchDataInfo(offsetMetadata,, length),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
* @return The number of bytes truncated from the log
def recover(maxMessageSize: Int): Int = {
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = Record.NO_TIMESTAMP
try {
for (entry <- log.shallowEntries(maxMessageSize).asScala) {
val record = entry.record
// The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
if (record.timestamp > maxTimestampSoFar) {
maxTimestampSoFar = record.timestamp
offsetOfMaxTimestamp = entry.offset
// Build offset index
if(validBytes - lastIndexEntry > indexIntervalBytes) {
val startOffset = entry.firstOffset
index.append(startOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
lastIndexEntry = validBytes
validBytes += entry.sizeInBytes()
} catch {
case e: CorruptRecordException =>
logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
.format(log.file.getAbsolutePath, validBytes, e.getMessage))
val truncated = log.sizeInBytes - validBytes
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
def loadLargestTimestamp(readToLogEnd: Boolean = false) {
// Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
val lastTimeIndexEntry = timeIndex.lastEntry
maxTimestampSoFar = lastTimeIndexEntry.timestamp
offsetOfMaxTimestamp = lastTimeIndexEntry.offset
if (readToLogEnd) {
val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
// Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
* Truncate off all index and log entries with offsets >= the given offset.
* If the given offset is larger than the largest message in this segment, do nothing.
* @param offset The offset to truncate to
* @return The number of log bytes truncated
def truncateTo(offset: Long): Int = {
val mapping = translateOffset(offset)
if (mapping == null)
return 0
// after truncation, reset and allocate more space for the (new currently active) index
val bytesTruncated = log.truncateTo(mapping.position.toInt)
if(log.sizeInBytes == 0) {
created = time.milliseconds
rollingBasedTimestamp = None
bytesSinceLastIndexEntry = 0
// We may need to reload the max timestamp after truncation.
if (maxTimestampSoFar >= 0)
loadLargestTimestamp(readToLogEnd = true)
* Calculate the offset that would be used for the next message to be append to this segment.
* Note that this is expensive.
def nextOffset(): Long = {
val ms = read(index.lastOffset, None, log.sizeInBytes)
if (ms == null) {
} else {
ms.records.shallowEntries.asScala.lastOption match {
case None => baseOffset
case Some(last) => last.nextOffset
* Flush this log segment to disk
def flush() {
LogFlushStats.logFlushTimer.time {
* Change the suffix for the index and log file for this log segment
def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
def kafkaStorageException(fileType: String, e: IOException) =
new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e)
try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
catch {
case e: IOException => throw kafkaStorageException("log", e)
try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
catch {
case e: IOException => throw kafkaStorageException("index", e)
try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
catch {
case e: IOException => throw kafkaStorageException("timeindex", e)
* Append the largest time index entry to the time index when this log segment become inactive segment.
* This entry will be used to decide when to delete the segment.
def onBecomeInactiveSegment() {
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
* The time this segment has waited to be rolled.
* If the first message has a timestamp we use the message timestamp to determine when to roll a segment. A segment
* is rolled if the difference between the new message's timestamp and the first message's timestamp exceeds the
* segment rolling time.
* If the first message does not have a timestamp, we use the wall clock time to determine when to roll a segment. A
* segment is rolled if the difference between the current wall clock time and the segment create time exceeds the
* segment rolling time.
def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
// Load the timestamp of the first message into memory
if (rollingBasedTimestamp.isEmpty) {
val iter = log.shallowEntries.iterator()
if (iter.hasNext)
rollingBasedTimestamp = Some(
rollingBasedTimestamp match {
case Some(t) if t >= 0 => messageTimestamp - t
case _ => now - created
* Search the message offset based on timestamp.
* This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
* greater than or equals to the target timestamp.
* If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
* timestamp will be max timestamp in the segment.
* If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
* the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
* This methods only returns None when the log is not empty but we did not see any messages when scanning the log
* from the indexed position. This could happen if the log is truncated after we get the indexed position but
* before we scan the log from there. In this case we simply return None and the caller will need to check on
* the truncated log and maybe retry or even do the search on another log segment.
* @param timestamp The timestamp to search for.
* @return the timestamp and offset of the first message whose timestamp is larger than or equal to the
* target timestamp. None will be returned if there is no such message.
def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
// Get the index entry with a timestamp less than or equal to the target timestamp
val timestampOffset = timeIndex.lookup(timestamp)
val position = index.lookup(timestampOffset.offset).position
// Search the timestamp
Option(log.searchForTimestamp(timestamp, position)).map { timestampAndOffset =>
TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
* Close this log segment
def close() {
CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true))
* Delete this log segment from the filesystem.
* @throws KafkaStorageException if the delete fails.
def delete() {
val deletedLog = log.delete()
val deletedIndex = index.delete()
val deletedTimeIndex = timeIndex.delete()
if(!deletedLog && log.file.exists)
throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
if(!deletedIndex && index.file.exists)
throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
if(!deletedTimeIndex && timeIndex.file.exists)
throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
* The last modified time of this log segment as a unix time stamp
def lastModified = log.file.lastModified
* The largest timestamp this segment contains.
def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified
* Change the last modified time for this log segment
def lastModified_=(ms: Long) = {
object LogFlushStats extends KafkaMetricsGroup {
val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))