blob: 2ab2e0cd2e0aacf88ef4bcdc51e10d9f5d453993 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.message
import java.nio._
import org.apache.kafka.common.record.TimestampType
import scala.math._
import kafka.utils._
import org.apache.kafka.common.utils.Utils
* Constants related to messages
object Message {
* The current offset and size for all the fixed-length fields
val CrcOffset = 0
val CrcLength = 4
val MagicOffset = CrcOffset + CrcLength
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength
val AttributesLength = 1
// Only message format version 1 has the timestamp field.
val TimestampOffset = AttributesOffset + AttributesLength
val TimestampLength = 8
val KeySizeOffset_V0 = AttributesOffset + AttributesLength
val KeySizeOffset_V1 = TimestampOffset + TimestampLength
val KeySizeLength = 4
val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
val ValueSizeLength = 4
private val MessageHeaderSizeMap = Map (
(0: Byte) -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength),
(1: Byte) -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength))
* The amount of overhead bytes in a message
* This value is only used to check if the message size is valid or not. So the minimum possible message bytes is
* used here, which comes from a message in message format V0 with empty key and value.
val MinMessageOverhead = KeyOffset_V0 + ValueSizeLength
* The "magic" value
* When magic value is 0, the message uses absolute offset and does not have a timestamp field.
* When magic value is 1, the message uses relative offset and has a timestamp field.
val MagicValue_V0: Byte = 0
val MagicValue_V1: Byte = 1
val CurrentMagicValue: Byte = 1
* Specifies the mask for the compression code. 3 bits to hold the compression codec.
* 0 is reserved to indicate no compression
val CompressionCodeMask: Int = 0x07
* Specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
* 0 for CreateTime, 1 for LogAppendTime
val TimestampTypeMask: Byte = 0x08
val TimestampTypeAttributeBitOffset: Int = 3
* Compression code for uncompressed messages
val NoCompression: Int = 0
* To indicate timestamp is not defined so "magic" value 0 will be used.
val NoTimestamp: Long = -1
* Give the header size difference between different message versions.
def headerSizeDiff(fromMagicValue: Byte, toMagicValue: Byte) : Int =
MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue)
* A message. The format of an N byte message is the following:
* 1. 4 byte CRC32 of the message
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
* bit 0 ~ 2 : Compression codec.
* 0 : no compression
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : Timestamp type
* 0 : create time
* 1 : log append time
* bit 4 ~ 7 : reserved
* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
* 5. 4 byte key length, containing length K
* 6. K byte key
* 7. 4 byte payload length, containing length V
* 8. V byte payload
* Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents.
* @param buffer the byte buffer of this message.
* @param wrapperMessageTimestamp the wrapper message timestamp, which is only defined when the message is an inner
* message of a compressed message.
* @param wrapperMessageTimestampType the wrapper message timestamp type, which is only defined when the message is an
* inner message of a compressed message.
class Message(val buffer: ByteBuffer,
private val wrapperMessageTimestamp: Option[Long] = None,
private val wrapperMessageTimestampType: Option[TimestampType] = None) {
import kafka.message.Message._
* A constructor to create a Message
* @param bytes The payload of the message
* @param key The key of the message (null, if none)
* @param timestamp The timestamp of the message.
* @param timestampType The timestamp type of the message.
* @param codec The compression codec used on the contents of the message (if any)
* @param payloadOffset The offset into the payload array used to extract payload
* @param payloadSize The size of the payload to use
* @param magicValue the magic value to use
def this(bytes: Array[Byte],
key: Array[Byte],
timestamp: Long,
timestampType: TimestampType,
codec: CompressionCodec,
payloadOffset: Int,
payloadSize: Int,
magicValue: Byte) = {
this(ByteBuffer.allocate(Message.CrcLength +
Message.MagicLength +
Message.AttributesLength +
(if (magicValue == Message.MagicValue_V0) 0
else Message.TimestampLength) +
Message.KeySizeLength +
(if(key == null) 0 else key.length) +
Message.ValueSizeLength +
(if(bytes == null) 0
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset)))
validateTimestampAndMagicValue(timestamp, magicValue)
// skip crc, we will fill that in at the end
val attributes: Byte =
if (codec.codec > 0)
timestampType.updateAttributes((CompressionCodeMask & codec.codec).toByte)
else 0
// Only put timestamp when "magic" value is greater than 0
if (magic > MagicValue_V0)
if(key == null) {
} else {
buffer.put(key, 0, key.length)
val size = if(bytes == null) -1
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset
if(bytes != null)
buffer.put(bytes, payloadOffset, size)
// now compute the checksum and fill it in
Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
this(bytes = bytes, key = key, timestamp = timestamp, timestampType = TimestampType.CREATE_TIME, codec = codec, payloadOffset = 0, payloadSize = -1, magicValue = magicValue)
def this(bytes: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
this(bytes = bytes, key = null, timestamp = timestamp, codec = codec, magicValue = magicValue)
def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, magicValue: Byte) =
this(bytes = bytes, key = key, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
def this(bytes: Array[Byte], timestamp: Long, magicValue: Byte) =
this(bytes = bytes, key = null, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
def this(bytes: Array[Byte]) =
this(bytes = bytes, key = null, timestamp = Message.NoTimestamp, codec = NoCompressionCodec, magicValue = Message.CurrentMagicValue)
* Compute the checksum of the message from the message contents
def computeChecksum: Long =
CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset)
* Retrieve the previously computed CRC for this message
def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
* Returns true if the crc stored with the message matches the crc computed off the message contents
def isValid: Boolean = checksum == computeChecksum
* Throw an InvalidMessageException if isValid is false for this message
def ensureValid() {
throw new InvalidMessageException(s"Message is corrupt (stored crc = ${checksum}, computed crc = ${computeChecksum})")
* The complete serialized size of this message in bytes (including crc, header attributes, etc)
def size: Int = buffer.limit
* The position where the key size is stored.
private def keySizeOffset = {
if (magic == MagicValue_V0) KeySizeOffset_V0
else KeySizeOffset_V1
* The length of the key in bytes
def keySize: Int = buffer.getInt(keySizeOffset)
* Does the message have a key?
def hasKey: Boolean = keySize >= 0
* The position where the payload size is stored
private def payloadSizeOffset = {
if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
else KeyOffset_V1 + max(0, keySize)
* The length of the message value in bytes
def payloadSize: Int = buffer.getInt(payloadSizeOffset)
* Is the payload of this message null
def isNull: Boolean = payloadSize < 0
* The magic version of this message
def magic: Byte = buffer.get(MagicOffset)
* The attributes stored with this message
def attributes: Byte = buffer.get(AttributesOffset)
* The timestamp of the message, only available when the "magic" value is greater than 0
* When magic > 0, The timestamp of a message is determined in the following way:
* 1. wrapperMessageTimestampType = None and wrapperMessageTimestamp is None - Uncompressed message, timestamp and timestamp type are in the message.
* 2. wrapperMessageTimestampType = LogAppendTime and wrapperMessageTimestamp is defined - Compressed message using LogAppendTime
* 3. wrapperMessageTimestampType = CreateTime and wrapperMessageTimestamp is defined - Compressed message using CreateTime
def timestamp: Long = {
if (magic == MagicValue_V0)
// Case 2
else if (wrapperMessageTimestampType.exists(_ == TimestampType.LOG_APPEND_TIME) && wrapperMessageTimestamp.isDefined)
else // case 1, 3
* The timestamp type of the message
def timestampType = {
if (magic == MagicValue_V0)
* The compression codec used with this message
def compressionCodec: CompressionCodec =
CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
* A ByteBuffer containing the content of the message
def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
* A ByteBuffer containing the message key
def key: ByteBuffer = sliceDelimited(keySizeOffset)
* convert the message to specified format
def toFormatVersion(toMagicValue: Byte): Message = {
if (magic == toMagicValue)
else {
val byteBuffer = ByteBuffer.allocate(size + Message.headerSizeDiff(magic, toMagicValue))
// Copy bytes from old messages to new message
convertToBuffer(toMagicValue, byteBuffer, Message.NoTimestamp)
new Message(byteBuffer)
def convertToBuffer(toMagicValue: Byte,
byteBuffer: ByteBuffer,
now: Long,
timestampType: TimestampType = wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))) {
if (byteBuffer.remaining() < size + headerSizeDiff(magic, toMagicValue))
throw new IndexOutOfBoundsException("The byte buffer does not have enough capacity to hold new message format " +
s"version $toMagicValue")
if (toMagicValue == Message.MagicValue_V1) {
// Up-conversion, reserve CRC and update magic byte
// Up-conversion, insert the timestamp field
if (timestampType == TimestampType.LOG_APPEND_TIME)
byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V0, size - Message.KeySizeOffset_V0)
} else {
// Down-conversion, reserve CRC and update magic byte
// Down-conversion, skip the timestamp field
byteBuffer.put(buffer.array(), buffer.arrayOffset() + Message.KeySizeOffset_V1, size - Message.KeySizeOffset_V1)
// update crc value
val newMessage = new Message(byteBuffer)
Utils.writeUnsignedInt(byteBuffer, Message.CrcOffset, newMessage.computeChecksum)
* Read a size-delimited byte buffer starting at the given offset
private def sliceDelimited(start: Int): ByteBuffer = {
val size = buffer.getInt(start)
if(size < 0) {
} else {
var b = buffer.duplicate()
b.position(start + 4)
b = b.slice()
* Validate the timestamp and "magic" value
private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) {
if (magic != MagicValue_V0 && magic != MagicValue_V1)
throw new IllegalArgumentException(s"Invalid magic value $magic")
if (timestamp < 0 && timestamp != NoTimestamp)
throw new IllegalArgumentException(s"Invalid message timestamp $timestamp")
if (magic == MagicValue_V0 && timestamp != NoTimestamp)
throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be ${NoTimestamp} when magic = ${MagicValue_V0}")
override def toString(): String = {
if (magic == MagicValue_V0)
s"Message(magic = $magic, attributes = $attributes, crc = $checksum, key = $key, payload = $payload)"
s"Message(magic = $magic, attributes = $attributes, $timestampType = $timestamp, crc = $checksum, key = $key, payload = $payload)"
override def equals(any: Any): Boolean = {
any match {
case that: Message => this.buffer.equals(that.buffer)
case _ => false
override def hashCode(): Int = buffer.hashCode