blob: 8cc1c945cc113bb4213c08e5efa2d26735830beb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import scala.math._
import java.io.File
import kafka.message._
import kafka.utils._
/**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
* any previous segment.
*
* A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
*/
@nonthreadsafe
class LogSegment(val messageSet: FileMessageSet,
val index: OffsetIndex,
val start: Long,
val indexIntervalBytes: Int,
time: Time) extends Range with Logging {
var firstAppendTime: Option[Long] =
if (messageSet.sizeInBytes > 0)
Some(time.milliseconds)
else
None
/* the number of bytes since we last added an entry in the offset index */
var bytesSinceLastIndexEntry = 0
@volatile var deleted = false
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset)),
new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
startOffset,
indexIntervalBytes,
SystemTime)
/* Return the size in bytes of this log segment */
def size: Long = messageSet.sizeInBytes()
def updateFirstAppendTime() {
if (firstAppendTime.isEmpty)
firstAppendTime = Some(time.milliseconds)
}
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock
*/
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, messageSet.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
messageSet.append(messages)
updateFirstAppendTime()
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
/**
* Find the physical file position for the least offset >= the given offset. If no offset is found
* that meets this criteria before the end of the log, return null.
*/
private def translateOffset(offset: Long): OffsetPosition = {
val mapping = index.lookup(offset)
messageSet.searchFor(offset, mapping.position)
}
/**
* Read a message set from this segment beginning with the first offset
* greater than or equal to the startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*/
def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
if(maxSize == 0)
return MessageSet.Empty
val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
// if the start position is already off the end of the log, return MessageSet.Empty
if(startPosition == null)
return MessageSet.Empty
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
case None =>
// no max offset, just use the max size they gave unmolested
maxSize
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset)
val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
messageSet.read(startPosition.position, length)
}
override def toString() = "LogSegment(start=" + start + ", size=" + size + ")"
/**
* Truncate off all index and log entries with offsets greater than or equal to the current offset.
*/
def truncateTo(offset: Long) {
val mapping = translateOffset(offset)
if(mapping == null)
return
index.truncateTo(offset)
// after truncation, reset and allocate more space for the (new currently active) index
index.resize(index.maxIndexSize)
messageSet.truncateTo(mapping.position)
if (messageSet.sizeInBytes == 0)
firstAppendTime = None
}
/**
* Calculate the offset that would be used for the next message to be append to this segment.
* Note that this is expensive.
*/
def nextOffset(): Long = {
val ms = read(index.lastOffset, messageSet.sizeInBytes, None)
ms.lastOption match {
case None => start
case Some(last) => last.nextOffset
}
}
/**
* Flush this log segment to disk
*/
def flush() {
messageSet.flush()
index.flush()
}
/**
* Close this log segment
*/
def close() {
Utils.swallow(index.close)
Utils.swallow(messageSet.close)
}
}