blob: 21c212fadf623635d46f48e26779437b5595fe9d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import java.nio.ByteBuffer
import kafka.common.InvalidOffsetException
import kafka.utils.CoreUtils._
import kafka.utils.Logging
import org.apache.kafka.common.record.Record
/**
* An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
* sparse, i.e. it may not hold an entry for all the messages in the segment.
*
* The index is stored in a file that is preallocated to hold a fixed maximum amount of 12-byte time index entries.
* The file format is a series of time index entries. The physical format is a 8 bytes timestamp and a 4 bytes "relative"
* offset used in the [[OffsetIndex]]. A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen
* before OFFSET is TIMESTAMP. i.e. Any message whose timestamp is greater than TIMESTAMP must come after OFFSET.
*
* All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
* storage format.
*
* The timestamps in the same time index file are guaranteed to be monotonically increasing.
*
* The index support timestamp lookup for a memory map of this file. The lookup is done using a binary search to find
* the offset of the message whose indexed timestamp is closest but smaller or equals to the target timestamp.
*
* Time index files can be opened in two ways: either as an empty, mutable index that allows appends or
* an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
* immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
*
* No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
*
*/
class TimeIndex(file: File,
baseOffset: Long,
maxIndexSize: Int = -1)
extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize) with Logging {
override def entrySize = 12
// We override the full check to reserve the last time index entry slot for the on roll call.
override def isFull: Boolean = entries >= maxEntries - 1
private def timestamp(buffer: ByteBuffer, n: Int): Long = buffer.getLong(n * entrySize)
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8)
/**
* The last entry in the index
*/
def lastEntry: TimestampOffset = {
inLock(lock) {
_entries match {
case 0 => TimestampOffset(Record.NO_TIMESTAMP, baseOffset)
case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset]
}
}
}
/**
* Get the nth timestamp mapping from the time index
* @param n The entry number in the time index
* @return The timestamp/offset pair at that entry
*/
def entry(n: Int): TimestampOffset = {
maybeLock(lock) {
if(n >= _entries)
throw new IllegalArgumentException("Attempt to fetch the %dth entry from a time index of size %d.".format(n, _entries))
val idx = mmap.duplicate
TimestampOffset(timestamp(idx, n), relativeOffset(idx, n))
}
}
override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n))
}
/**
* Attempt to append a time index entry to the time index.
* The new entry is appended only if both the timestamp and offsets are greater than the last appended timestamp and
* the last appended offset.
*
* @param timestamp The timestamp of the new time index entry
* @param offset The offset of the new time index entry
* @param skipFullCheck To skip checking whether the segment is full or not. We only skip the check when the segment
* gets rolled or the segment is closed.
*/
def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {
inLock(lock) {
if (!skipFullCheck)
require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
// We do not throw exception when the offset equals to the offset of last entry. That means we are trying
// to insert the same time index entry as the last entry.
// If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
// because that could happen in the following two scenarios:
// 1. An log segment is closed.
// 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
if (_entries != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException("Attempt to append an offset (%d) to slot %d no larger than the last offset appended (%d) to %s."
.format(offset, _entries, lastEntry.offset, file.getAbsolutePath))
if (_entries != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException("Attempt to append an timestamp (%d) to slot %d no larger than the last timestamp appended (%d) to %s."
.format(timestamp, _entries, lastEntry.timestamp, file.getAbsolutePath))
// We only append to the time index when the timestamp is greater than the last inserted timestamp.
// If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
// index will be empty.
if (timestamp > lastEntry.timestamp) {
debug("Adding index entry %d => %d to %s.".format(timestamp, offset, file.getName))
mmap.putLong(timestamp)
mmap.putInt((offset - baseOffset).toInt)
_entries += 1
require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
}
}
}
/**
* Find the time index entry whose timestamp is less than or equal to the given timestamp.
* If the target timestamp is smaller than the least timestamp in the time index, (NoTimestamp, baseOffset) is
* returned.
*
* @param targetTimestamp The timestamp to look up.
* @return The time index entry found.
*/
def lookup(targetTimestamp: Long): TimestampOffset = {
maybeLock(lock) {
val idx = mmap.duplicate
val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
if (slot == -1)
TimestampOffset(Record.NO_TIMESTAMP, baseOffset)
else {
val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset]
TimestampOffset(entry.timestamp, entry.offset)
}
}
}
override def truncate() = truncateToEntries(0)
/**
* Remove all entries from the index which have an offset greater than or equal to the given offset.
* Truncating to an offset larger than the largest in the index has no effect.
*/
override def truncateTo(offset: Long) {
inLock(lock) {
val idx = mmap.duplicate
val slot = indexSlotFor(idx, offset, IndexSearchType.VALUE)
/* There are 3 cases for choosing the new size
* 1) if there is no entry in the index <= the offset, delete everything
* 2) if there is an entry for this exact offset, delete it and everything larger than it
* 3) if there is no entry for this offset, delete everything larger than the next smallest
*/
val newEntries =
if(slot < 0)
0
else if(relativeOffset(idx, slot) == offset - baseOffset)
slot
else
slot + 1
truncateToEntries(newEntries)
}
}
/**
* Truncates index to a known number of entries.
*/
private def truncateToEntries(entries: Int) {
inLock(lock) {
_entries = entries
mmap.position(_entries * entrySize)
}
}
override def sanityCheck() {
val entry = lastEntry
val lastTimestamp = entry.timestamp
val lastOffset = entry.offset
require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)),
s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " +
s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}")
require(_entries == 0 || lastOffset >= baseOffset,
s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
s"is $lastOffset which is smaller than the first offset $baseOffset")
val len = file.length()
require(len % entrySize == 0,
"Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
" bytes which is not positive or not a multiple of 12.")
}
}