blob: 545069972fb76fdb07c256d6e75497e74631cfb1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.util.concurrent.atomic._
import java.text.NumberFormat
import java.io._
import kafka.message._
import kafka.utils._
import kafka.common._
import kafka.api.OffsetRequest
import java.util._
import kafka.server.BrokerTopicStat
private[log] object Log {
val FileSuffix = ".kafka"
/**
* Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
* but instead of checking for equality looks within the range. Takes the array size as an option in case
* the array grows while searching happens
*
* TODO: This should move into SegmentList.scala
*/
def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = {
if(ranges.size < 1)
return None
// check out of bounds
if(value < ranges(0).start || value > ranges(arraySize - 1).start + ranges(arraySize - 1).size)
throw new OffsetOutOfRangeException("offset " + value + " is out of range")
// check at the end
if (value == ranges(arraySize - 1).start + ranges(arraySize - 1).size)
return None
var low = 0
var high = arraySize - 1
while(low <= high) {
val mid = (high + low) / 2
val found = ranges(mid)
if(found.contains(value))
return Some(found)
else if (value < found.start)
high = mid - 1
else
low = mid + 1
}
None
}
def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] =
findRange(ranges, value, ranges.length)
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically
*/
def nameFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset) + Log.FileSuffix
}
def getEmptyOffsets(request: OffsetRequest): Array[Long] = {
if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime)
return Array(0L)
else
return Array()
}
}
/**
* A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size
*/
private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range {
var firstAppendTime: Option[Long] = None
@volatile var deleted = false
def size: Long = messageSet.highWaterMark
private def updateFirstAppendTime() {
if (firstAppendTime.isEmpty)
firstAppendTime = Some(time.milliseconds)
}
def append(messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
messageSet.append(messages)
updateFirstAppendTime()
}
}
override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
}
/**
* An append-only log for storing messages.
*/
@threadsafe
private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val maxMessageSize: Int,
val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean) extends Logging {
/* A lock that guards all modifications to the log */
private val lock = new Object
/* The current number of unflushed messages appended to the write */
private val unflushed = new AtomicInteger(0)
/* last time it was flushed */
private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
/* The actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments()
/* The name of this log */
val name = dir.getName()
private val logStats = new LogStats(this)
Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
/* Load the log segments from the log files on disk */
private def loadSegments(): SegmentList[LogSegment] = {
// open all the segments read-only
val accum = new ArrayList[LogSegment]
val ls = dir.listFiles()
if(ls != null) {
for(file <- ls if file.isFile && file.toString.endsWith(Log.FileSuffix)) {
if(!file.canRead)
throw new IOException("Could not read file " + file)
val filename = file.getName()
val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
val messageSet = new FileMessageSet(file, false)
accum.add(new LogSegment(file, time, messageSet, start))
}
}
if(accum.size == 0) {
// no existing segments, create a new mutable segment
val newFile = new File(dir, Log.nameFromOffset(0))
val set = new FileMessageSet(newFile, true)
accum.add(new LogSegment(newFile, time, set, 0))
} else {
// there is at least one existing segment, validate and recover them/it
// sort segments into ascending order for fast searching
Collections.sort(accum, new Comparator[LogSegment] {
def compare(s1: LogSegment, s2: LogSegment): Int = {
if(s1.start == s2.start) 0
else if(s1.start < s2.start) -1
else 1
}
})
validateSegments(accum)
//make the final section mutable and run recovery on it if necessary
val last = accum.remove(accum.size - 1)
last.messageSet.close()
info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
val mutable = new LogSegment(last.file, time, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
accum.add(mutable)
}
new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
}
/**
* Check that the ranges and sizes add up, otherwise we have lost some data somewhere
*/
private def validateSegments(segments: ArrayList[LogSegment]) {
lock synchronized {
for(i <- 0 until segments.size - 1) {
val curr = segments.get(i)
val next = segments.get(i+1)
if(curr.start + curr.size != next.start)
throw new IllegalStateException("The following segments don't validate: " +
curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
}
}
}
/**
* The number of segments in the log
*/
def numberOfSegments: Int = segments.view.length
/**
* Close this log
*/
def close() {
lock synchronized {
for(seg <- segments.view)
seg.messageSet.close()
}
}
/**
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
* Returns the offset at which the messages are written.
*/
def append(messages: ByteBufferMessageSet): Unit = {
// validate the messages
messages.verifyMessageSize(maxMessageSize)
var numberOfMessages = 0
for(messageAndOffset <- messages) {
if(!messageAndOffset.message.isValid)
throw new InvalidMessageException()
numberOfMessages += 1;
}
BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
logStats.recordAppendedMessages(numberOfMessages)
// truncate the message set's buffer upto validbytes, before appending it to the on-disk log
val validByteBuffer = messages.getBuffer.duplicate()
val messageSetValidBytes = messages.validBytes
if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
" Message set cannot be appended to log. Possible causes are corrupted produce requests")
validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
val validMessages = new ByteBufferMessageSet(validByteBuffer)
// they are valid, insert them in the log
lock synchronized {
try {
var segment = segments.view.last
maybeRoll(segment)
segment = segments.view.last
segment.append(validMessages)
maybeFlush(numberOfMessages)
}
catch {
case e: IOException =>
fatal("Halting due to unrecoverable I/O error while handling producer request", e)
Runtime.getRuntime.halt(1)
case e2 => throw e2
}
}
}
/**
* Read from the log file at the given offset
*/
def read(offset: Long, length: Int): MessageSet = {
val view = segments.view
Log.findRange(view, offset, view.length) match {
case Some(segment) => segment.messageSet.read((offset - segment.start), length)
case _ => MessageSet.Empty
}
}
/**
* Delete any log segments matching the given predicate function
*/
def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
lock synchronized {
val view = segments.view
val deletable = view.takeWhile(predicate)
for(seg <- deletable)
seg.deleted = true
var numToDelete = deletable.size
// if we are deleting everything, create a new empty segment
if(numToDelete == view.size) {
if (view(numToDelete - 1).size > 0)
roll()
else {
// If the last segment to be deleted is empty and we roll the log, the new segment will have the same
// file name. So simply reuse the last segment and reset the modified time.
view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds)
numToDelete -=1
}
}
segments.trunc(numToDelete)
}
}
/**
* Get the size of the log in bytes
*/
def size: Long =
segments.view.foldLeft(0L)(_ + _.size)
/**
* The byte offset of the message that will be appended next.
*/
def nextAppendOffset: Long = {
flush
val last = segments.view.last
last.start + last.size
}
/**
* get the current high watermark of the log
*/
def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark
/**
* Roll the log over if necessary
*/
private def maybeRoll(segment: LogSegment) {
if((segment.messageSet.sizeInBytes > maxSize) ||
((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
roll()
}
/**
* Create a new segment and make it active
*/
def roll() {
lock synchronized {
val newOffset = nextAppendOffset
val newFile = new File(dir, Log.nameFromOffset(newOffset))
if (newFile.exists) {
warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
newFile.delete()
}
debug("Rolling log '" + name + "' to " + newFile.getName())
segments.append(new LogSegment(newFile, time, new FileMessageSet(newFile, true), newOffset))
}
}
/**
* Flush the log if necessary
*/
private def maybeFlush(numberOfMessages : Int) {
if(unflushed.addAndGet(numberOfMessages) >= flushInterval) {
flush()
}
}
/**
* Flush this log file to the physical disk
*/
def flush() : Unit = {
if (unflushed.get == 0) return
lock synchronized {
debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
System.currentTimeMillis)
segments.view.last.messageSet.flush()
unflushed.set(0)
lastflushedTime.set(System.currentTimeMillis)
}
}
def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
val segsArray = segments.view
var offsetTimeArray: Array[Tuple2[Long, Long]] = null
if (segsArray.last.size > 0)
offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
else
offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
for (i <- 0 until segsArray.length)
offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
if (segsArray.last.size > 0)
offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
var startIndex = -1
request.time match {
case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
var isFound = false
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= request.time)
isFound = true
else
startIndex -=1
}
}
val retSize = request.maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for (j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
ret
}
def getTopicName():String = {
name.substring(0, name.lastIndexOf("-"))
}
def getLastFlushedTime():Long = {
return lastflushedTime.get
}
}