blob: 7c9b4f85a60a62a7fd0c57a15d7ca3a4eb66fa4e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.message
import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.atomic._
import kafka.utils._
/**
* An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
* will fail on an immutable message set. An optional limit and offset can be applied to the message set
* which will control the offset into the file and the effective length into the file from which
* messages will be read
*/
@nonthreadsafe
class FileMessageSet private[kafka](private[message] val channel: FileChannel,
private[message] val offset: Long,
private[message] val limit: Long,
val mutable: Boolean,
val needRecover: AtomicBoolean) extends MessageSet with Logging {
private val setSize = new AtomicLong()
private val setHighWaterMark = new AtomicLong()
if(mutable) {
if(limit < Long.MaxValue || offset > 0)
throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
if (needRecover.get) {
// set the file position to the end of the file for appending messages
val startMs = System.currentTimeMillis
val truncated = recover()
info("Recovery succeeded in " + (System.currentTimeMillis - startMs) / 1000 +
" seconds. " + truncated + " bytes truncated.")
}
else {
setSize.set(channel.size())
setHighWaterMark.set(sizeInBytes)
channel.position(channel.size)
}
} else {
setSize.set(scala.math.min(channel.size(), limit) - offset)
setHighWaterMark.set(sizeInBytes)
debug("initializing high water mark in immutable mode: " + highWaterMark)
}
/**
* Create a file message set with no limit or offset
*/
def this(channel: FileChannel, mutable: Boolean) =
this(channel, 0, Long.MaxValue, mutable, new AtomicBoolean(false))
/**
* Create a file message set with no limit or offset
*/
def this(file: File, mutable: Boolean) =
this(Utils.openChannel(file, mutable), mutable)
/**
* Create a file message set with no limit or offset
*/
def this(channel: FileChannel, mutable: Boolean, needRecover: AtomicBoolean) =
this(channel, 0, Long.MaxValue, mutable, needRecover)
/**
* Create a file message set with no limit or offset
*/
def this(file: File, mutable: Boolean, needRecover: AtomicBoolean) =
this(Utils.openChannel(file, mutable), mutable, needRecover)
/**
* Return a message set which is a view into this set starting from the given offset and with the given size limit.
*/
def read(readOffset: Long, size: Long): MessageSet = {
new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
false, new AtomicBoolean(false))
}
/**
* Write some of this set to the given channel, return the ammount written
*/
def writeTo(destChannel: GatheringByteChannel, writeOffset: Long, size: Long): Long =
channel.transferTo(offset + writeOffset, scala.math.min(size, sizeInBytes), destChannel)
/**
* Get an iterator over the messages in the set
*/
override def iterator: Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = offset
override def makeNext(): MessageAndOffset = {
// read the size of the item
val sizeBuffer = ByteBuffer.allocate(4)
channel.read(sizeBuffer, location)
if(sizeBuffer.hasRemaining)
return allDone()
sizeBuffer.rewind()
val size: Int = sizeBuffer.getInt()
if (size < Message.MinHeaderSize)
return allDone()
// read the item itself
val buffer = ByteBuffer.allocate(size)
channel.read(buffer, location + 4)
if(buffer.hasRemaining)
return allDone()
buffer.rewind()
// increment the location and return the item
location += size + 4
new MessageAndOffset(new Message(buffer), location)
}
}
}
/**
* The number of bytes taken up by this file set
*/
def sizeInBytes(): Long = setSize.get()
/**
* The high water mark
*/
def highWaterMark(): Long = setHighWaterMark.get()
def checkMutable(): Unit = {
if(!mutable)
throw new IllegalStateException("Attempt to invoke mutation on immutable message set.")
}
/**
* Append this message to the message set
*/
def append(messages: MessageSet): Unit = {
checkMutable()
var written = 0L
while(written < messages.sizeInBytes)
written += messages.writeTo(channel, 0, messages.sizeInBytes)
setSize.getAndAdd(written)
}
/**
* Commit all written data to the physical disk
*/
def flush() = {
checkMutable()
val startTime = SystemTime.milliseconds
channel.force(true)
val elapsedTime = SystemTime.milliseconds - startTime
LogFlushStats.recordFlushRequest(elapsedTime)
debug("flush time " + elapsedTime)
setHighWaterMark.set(sizeInBytes)
debug("flush high water mark:" + highWaterMark)
}
/**
* Close this message set
*/
def close() = {
if(mutable)
flush()
channel.close()
}
/**
* Recover log up to the last complete entry. Truncate off any bytes from any incomplete messages written
*/
def recover(): Long = {
checkMutable()
val len = channel.size
val buffer = ByteBuffer.allocate(4)
var validUpTo: Long = 0
var next = 0L
do {
next = validateMessage(channel, validUpTo, len, buffer)
if(next >= 0)
validUpTo = next
} while(next >= 0)
channel.truncate(validUpTo)
setSize.set(validUpTo)
setHighWaterMark.set(validUpTo)
info("recover high water mark:" + highWaterMark)
/* This should not be necessary, but fixes bug 6191269 on some OSs. */
channel.position(validUpTo)
needRecover.set(false)
len - validUpTo
}
/**
* Read, validate, and discard a single message, returning the next valid offset, and
* the message being validated
*/
private def validateMessage(channel: FileChannel, start: Long, len: Long, buffer: ByteBuffer): Long = {
buffer.rewind()
var read = channel.read(buffer, start)
if(read < 4)
return -1
// check that we have sufficient bytes left in the file
val size = buffer.getInt(0)
if (size < Message.MinHeaderSize)
return -1
val next = start + 4 + size
if(next > len)
return -1
// read the message
val messageBuffer = ByteBuffer.allocate(size)
var curr = start + 4
while(messageBuffer.hasRemaining) {
read = channel.read(messageBuffer, curr)
if(read < 0)
throw new IllegalStateException("File size changed during recovery!")
else
curr += read
}
messageBuffer.rewind()
val message = new Message(messageBuffer)
if(!message.isValid)
return -1
else
next
}
}
trait LogFlushStatsMBean {
def getFlushesPerSecond: Double
def getAvgFlushMs: Double
def getTotalFlushMs: Long
def getMaxFlushMs: Double
def getNumFlushes: Long
}
@threadsafe
class LogFlushStats extends LogFlushStatsMBean {
private val flushRequestStats = new SnapshotStats
def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond
def getAvgFlushMs: Double = flushRequestStats.getAvgMetric
def getTotalFlushMs: Long = flushRequestStats.getTotalMetric
def getMaxFlushMs: Double = flushRequestStats.getMaxMetric
def getNumFlushes: Long = flushRequestStats.getNumRequests
}
object LogFlushStats extends Logging {
private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
private val stats = new LogFlushStats
Utils.registerMBean(stats, LogFlushStatsMBeanName)
def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
}