blob: 6f850d54ecf750513fb1ae3a8ba2d805337ae361 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.{File, IOException, RandomAccessFile}
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.nio.channels.FileChannel
import java.util.concurrent.locks.{Lock, ReentrantLock}
import kafka.log.IndexSearchType.IndexSearchEntity
import kafka.utils.CoreUtils.inLock
import kafka.utils.{CoreUtils, Logging, Os}
import org.apache.kafka.common.utils.Utils
import sun.nio.ch.DirectBuffer
import scala.math.ceil
/**
* The abstract index class which holds entry format agnostic methods.
*
* @param file The index file
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
extends Logging {
protected def entrySize: Int
protected val lock = new ReentrantLock
@volatile
protected var mmap: MappedByteBuffer = {
val newlyCreated = file.createNewFile()
val raf = new RandomAccessFile(file, "rw")
try {
/* pre-allocate the file if necessary */
if(newlyCreated) {
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
/* memory-map the file */
val len = raf.length()
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
/* set the position in the index for the next entry */
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
idx.position(roundDownToExactMultiple(idx.limit, entrySize))
idx
} finally {
CoreUtils.swallow(raf.close())
}
}
/**
* The maximum number of entries this index can hold
*/
@volatile
private[this] var _maxEntries = mmap.limit / entrySize
/** The number of entries in this index */
@volatile
protected var _entries = mmap.position / entrySize
/**
* True iff there are no more slots available in this index
*/
def isFull: Boolean = _entries >= _maxEntries
def maxEntries: Int = _maxEntries
def entries: Int = _entries
/**
* Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
* trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
* loading segments from disk or truncating back to an old segment where a new log segment became active;
* we want to reset the index size to maximum index size to avoid rolling new segment.
*/
def resize(newSize: Int) {
inLock(lock) {
val raf = new RandomAccessFile(file, "rw")
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
val position = mmap.position
/* Windows won't let us modify the file length while the file is mmapped :-( */
if(Os.isWindows)
forceUnmap(mmap)
try {
raf.setLength(roundedNewSize)
mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
_maxEntries = mmap.limit / entrySize
mmap.position(position)
} finally {
CoreUtils.swallow(raf.close())
}
}
}
/**
* Rename the file that backs this offset index
*
* @throws IOException if rename fails
*/
def renameTo(f: File) {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
finally file = f
}
/**
* Flush the data in the index to disk
*/
def flush() {
inLock(lock) {
mmap.force()
}
}
/**
* Delete this index file
*/
def delete(): Boolean = {
info(s"Deleting index ${file.getAbsolutePath}")
inLock(lock) {
// On JVM, a memory mapping is typically unmapped by garbage collector.
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
CoreUtils.swallow(forceUnmap(mmap))
// Accessing unmapped mmap crashes JVM by SEGV.
// Accessing it after this method called sounds like a bug but for safety, assign null and do not allow later access.
mmap = null
}
file.delete()
}
/**
* Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
* the file.
*/
def trimToValidSize() {
inLock(lock) {
resize(entrySize * _entries)
}
}
/**
* The number of bytes actually used by this index
*/
def sizeInBytes = entrySize * _entries
/** Close the index */
def close() {
trimToValidSize()
}
/**
* Do a basic sanity check on this index to detect obvious problems
*
* @throws IllegalArgumentException if any problems are found
*/
def sanityCheck(): Unit
/**
* Remove all the entries from the index.
*/
def truncate(): Unit
/**
* Remove all entries from the index which have an offset greater than or equal to the given offset.
* Truncating to an offset larger than the largest in the index has no effect.
*/
def truncateTo(offset: Long): Unit
/**
* Forcefully free the buffer's mmap.
*/
protected def forceUnmap(m: MappedByteBuffer) {
try {
m match {
case buffer: DirectBuffer =>
val bufferCleaner = buffer.cleaner()
/* cleaner can be null if the mapped region has size 0 */
if (bufferCleaner != null)
bufferCleaner.clean()
case _ =>
}
} catch {
case t: Throwable => error("Error when freeing index buffer", t)
}
}
/**
* Execute the given function in a lock only if we are running on windows. We do this
* because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
* and this requires synchronizing reads.
*/
protected def maybeLock[T](lock: Lock)(fun: => T): T = {
if(Os.isWindows)
lock.lock()
try {
fun
} finally {
if(Os.isWindows)
lock.unlock()
}
}
/**
* To parse an entry in the index.
*
* @param buffer the buffer of this memory mapped index.
* @param n the slot
* @return the index entry stored in the given slot.
*/
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
/**
* Find the slot in which the largest entry less than or equal to the given target key or value is stored.
* The comparison is made using the `IndexEntry.compareTo()` method.
*
* @param idx The index buffer
* @param target The index key to look for
* @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
*/
protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
// check if the index is empty
if(_entries == 0)
return -1
// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return -1
// binary search for the entry
var lo = 0
var hi = _entries - 1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return mid
}
lo
}
private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {
searchEntity match {
case IndexSearchType.KEY => indexEntry.indexKey.compareTo(target)
case IndexSearchType.VALUE => indexEntry.indexValue.compareTo(target)
}
}
/**
* Round a number to the greatest exact multiple of the given factor less than the given number.
* E.g. roundDownToExactMultiple(67, 8) == 64
*/
private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
}
object IndexSearchType extends Enumeration {
type IndexSearchEntity = Value
val KEY, VALUE = Value
}