| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.log |
| |
| import scala.math._ |
| import java.io._ |
| import java.nio._ |
| import java.nio.channels._ |
| import java.util.concurrent.locks._ |
| import java.util.concurrent.atomic._ |
| import kafka.utils._ |
| import kafka.utils.Utils.inLock |
| import kafka.common.InvalidOffsetException |
| |
| /** |
| * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: |
| * that is it may not hold an entry for all messages in the log. |
| * |
| * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries. |
| * |
| * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant |
| * to locate the offset/location pair for the greatest offset less than or equal to the target offset. |
| * |
| * Index files can be opened in two ways: either as an empty, mutable index that allows appends or |
| * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an |
| * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. |
| * |
| * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. |
| * |
| * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the |
| * message with that offset. The offset stored is relative to the base offset of the index file. So, for example, |
| * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use |
| * only 4 bytes for the offset. |
| * |
| * The frequency of entries is up to the user of this class. |
| * |
| * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal |
| * storage format. |
| */ |
| class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { |
| |
| private val lock = new ReentrantLock |
| |
| /* initialize the memory mapping for this index */ |
| private var mmap: MappedByteBuffer = |
| { |
| val newlyCreated = file.createNewFile() |
| val raf = new RandomAccessFile(file, "rw") |
| try { |
| /* pre-allocate the file if necessary */ |
| if(newlyCreated) { |
| if(maxIndexSize < 8) |
| throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) |
| raf.setLength(roundToExactMultiple(maxIndexSize, 8)) |
| } |
| |
| /* memory-map the file */ |
| val len = raf.length() |
| val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) |
| |
| /* set the position in the index for the next entry */ |
| if(newlyCreated) |
| idx.position(0) |
| else |
| // if this is a pre-existing index, assume it is all valid and set position to last entry |
| idx.position(roundToExactMultiple(idx.limit, 8)) |
| idx |
| } finally { |
| Utils.swallow(raf.close()) |
| } |
| } |
| |
| /* the number of eight-byte entries currently in the index */ |
| private var size = new AtomicInteger(mmap.position / 8) |
| |
| /** |
| * The maximum number of eight-byte entries this index can hold |
| */ |
| @volatile |
| var maxEntries = mmap.limit / 8 |
| |
| /* the last offset in the index */ |
| var lastOffset = readLastEntry.offset |
| |
| debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" |
| .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) |
| |
| /** |
| * The last entry in the index |
| */ |
| def readLastEntry(): OffsetPosition = { |
| inLock(lock) { |
| size.get match { |
| case 0 => OffsetPosition(baseOffset, 0) |
| case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1)) |
| } |
| } |
| } |
| |
| /** |
| * Find the largest offset less than or equal to the given targetOffset |
| * and return a pair holding this offset and it's corresponding physical file position. |
| * |
| * @param targetOffset The offset to look up. |
| * |
| * @return The offset found and the corresponding file position for this offset. |
| * If the target offset is smaller than the least entry in the index (or the index is empty), |
| * the pair (baseOffset, 0) is returned. |
| */ |
| def lookup(targetOffset: Long): OffsetPosition = { |
| maybeLock(lock) { |
| val idx = mmap.duplicate |
| val slot = indexSlotFor(idx, targetOffset) |
| if(slot == -1) |
| OffsetPosition(baseOffset, 0) |
| else |
| OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot)) |
| } |
| } |
| |
| /** |
| * Find the slot in which the largest offset less than or equal to the given |
| * target offset is stored. |
| * |
| * @param idx The index buffer |
| * @param targetOffset The offset to look for |
| * |
| * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty |
| */ |
| private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { |
| // we only store the difference from the base offset so calculate that |
| val relOffset = targetOffset - baseOffset |
| |
| // check if the index is empty |
| if(entries == 0) |
| return -1 |
| |
| // check if the target offset is smaller than the least offset |
| if(relativeOffset(idx, 0) > relOffset) |
| return -1 |
| |
| // binary search for the entry |
| var lo = 0 |
| var hi = entries-1 |
| while(lo < hi) { |
| val mid = ceil(hi/2.0 + lo/2.0).toInt |
| val found = relativeOffset(idx, mid) |
| if(found == relOffset) |
| return mid |
| else if(found < relOffset) |
| lo = mid |
| else |
| hi = mid - 1 |
| } |
| lo |
| } |
| |
| /* return the nth offset relative to the base offset */ |
| private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) |
| |
| /* return the nth physical position */ |
| private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) |
| |
| /** |
| * Get the nth offset mapping from the index |
| * @param n The entry number in the index |
| * @return The offset/position pair at that entry |
| */ |
| def entry(n: Int): OffsetPosition = { |
| maybeLock(lock) { |
| if(n >= entries) |
| throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) |
| val idx = mmap.duplicate |
| OffsetPosition(relativeOffset(idx, n), physical(idx, n)) |
| } |
| } |
| |
| /** |
| * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. |
| */ |
| def append(offset: Long, position: Int) { |
| inLock(lock) { |
| require(!isFull, "Attempt to append to a full index (size = " + size + ").") |
| if (size.get == 0 || offset > lastOffset) { |
| debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) |
| this.mmap.putInt((offset - baseOffset).toInt) |
| this.mmap.putInt(position) |
| this.size.incrementAndGet() |
| this.lastOffset = offset |
| require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") |
| } else { |
| throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." |
| .format(offset, entries, lastOffset, file.getAbsolutePath)) |
| } |
| } |
| } |
| |
| /** |
| * True iff there are no more slots available in this index |
| */ |
| def isFull: Boolean = entries >= this.maxEntries |
| |
| /** |
| * Truncate the entire index, deleting all entries |
| */ |
| def truncate() = truncateToEntries(0) |
| |
| /** |
| * Remove all entries from the index which have an offset greater than or equal to the given offset. |
| * Truncating to an offset larger than the largest in the index has no effect. |
| */ |
| def truncateTo(offset: Long) { |
| inLock(lock) { |
| val idx = mmap.duplicate |
| val slot = indexSlotFor(idx, offset) |
| |
| /* There are 3 cases for choosing the new size |
| * 1) if there is no entry in the index <= the offset, delete everything |
| * 2) if there is an entry for this exact offset, delete it and everything larger than it |
| * 3) if there is no entry for this offset, delete everything larger than the next smallest |
| */ |
| val newEntries = |
| if(slot < 0) |
| 0 |
| else if(relativeOffset(idx, slot) == offset - baseOffset) |
| slot |
| else |
| slot + 1 |
| truncateToEntries(newEntries) |
| } |
| } |
| |
| /** |
| * Truncates index to a known number of entries. |
| */ |
| private def truncateToEntries(entries: Int) { |
| inLock(lock) { |
| this.size.set(entries) |
| mmap.position(this.size.get * 8) |
| this.lastOffset = readLastEntry.offset |
| } |
| } |
| |
| /** |
| * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from |
| * the file. |
| */ |
| def trimToValidSize() { |
| inLock(lock) { |
| resize(entries * 8) |
| } |
| } |
| |
| /** |
| * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in |
| * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at |
| * loading segments from disk or truncating back to an old segment where a new log segment became active; |
| * we want to reset the index size to maximum index size to avoid rolling new segment. |
| */ |
| def resize(newSize: Int) { |
| inLock(lock) { |
| val raf = new RandomAccessFile(file, "rws") |
| val roundedNewSize = roundToExactMultiple(newSize, 8) |
| val position = this.mmap.position |
| |
| /* Windows won't let us modify the file length while the file is mmapped :-( */ |
| if(Os.isWindows) |
| forceUnmap(this.mmap) |
| try { |
| raf.setLength(roundedNewSize) |
| this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) |
| this.maxEntries = this.mmap.limit / 8 |
| this.mmap.position(position) |
| } finally { |
| Utils.swallow(raf.close()) |
| } |
| } |
| } |
| |
| /** |
| * Forcefully free the buffer's mmap. We do this only on windows. |
| */ |
| private def forceUnmap(m: MappedByteBuffer) { |
| try { |
| if(m.isInstanceOf[sun.nio.ch.DirectBuffer]) |
| (m.asInstanceOf[sun.nio.ch.DirectBuffer]).cleaner().clean() |
| } catch { |
| case t: Throwable => warn("Error when freeing index buffer", t) |
| } |
| } |
| |
| /** |
| * Flush the data in the index to disk |
| */ |
| def flush() { |
| inLock(lock) { |
| mmap.force() |
| } |
| } |
| |
| /** |
| * Delete this index file |
| */ |
| def delete(): Boolean = { |
| info("Deleting index " + this.file.getAbsolutePath) |
| this.file.delete() |
| } |
| |
| /** The number of entries in this index */ |
| def entries() = size.get |
| |
| /** |
| * The number of bytes actually used by this index |
| */ |
| def sizeInBytes() = 8 * entries |
| |
| /** Close the index */ |
| def close() { |
| trimToValidSize() |
| } |
| |
| /** |
| * Rename the file that backs this offset index |
| * @return true iff the rename was successful |
| */ |
| def renameTo(f: File): Boolean = { |
| val success = this.file.renameTo(f) |
| this.file = f |
| success |
| } |
| |
| /** |
| * Do a basic sanity check on this index to detect obvious problems |
| * @throws IllegalArgumentException if any problems are found |
| */ |
| def sanityCheck() { |
| require(entries == 0 || lastOffset > baseOffset, |
| "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" |
| .format(file.getAbsolutePath, lastOffset, baseOffset)) |
| val len = file.length() |
| require(len % 8 == 0, |
| "Index file " + file.getName + " is corrupt, found " + len + |
| " bytes which is not positive or not a multiple of 8.") |
| } |
| |
| /** |
| * Round a number to the greatest exact multiple of the given factor less than the given number. |
| * E.g. roundToExactMultiple(67, 8) == 64 |
| */ |
| private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) |
| |
| /** |
| * Execute the given function in a lock only if we are running on windows. We do this |
| * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it |
| * and this requires synchronizing reads. |
| */ |
| private def maybeLock[T](lock: Lock)(fun: => T): T = { |
| if(Os.isWindows) |
| lock.lock() |
| try { |
| return fun |
| } finally { |
| if(Os.isWindows) |
| lock.unlock() |
| } |
| } |
| } |