blob: e0efb3d39838037a88b63561196fc67283f7f766 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.message
import java.nio._
import org.apache.kafka.common.record.{Record, TimestampType}
import scala.math._
import kafka.utils._
import org.apache.kafka.common.utils.Utils
/**
* Constants related to messages
*/
object Message {
/**
* The current offset and size for all the fixed-length fields
*/
val CrcOffset = 0
val CrcLength = 4
val MagicOffset = CrcOffset + CrcLength
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength
val AttributesLength = 1
// Only message format version 1 has the timestamp field.
val TimestampOffset = AttributesOffset + AttributesLength
val TimestampLength = 8
val KeySizeOffset_V0 = AttributesOffset + AttributesLength
val KeySizeOffset_V1 = TimestampOffset + TimestampLength
val KeySizeLength = 4
val KeyOffset_V0 = KeySizeOffset_V0 + KeySizeLength
val KeyOffset_V1 = KeySizeOffset_V1 + KeySizeLength
val ValueSizeLength = 4
private val MessageHeaderSizeMap = Map (
(0: Byte) -> (CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength),
(1: Byte) -> (CrcLength + MagicLength + AttributesLength + TimestampLength + KeySizeLength + ValueSizeLength))
/**
* The amount of overhead bytes in a message
* This value is only used to check if the message size is valid or not. So the minimum possible message bytes is
* used here, which comes from a message in message format V0 with empty key and value.
*/
val MinMessageOverhead = KeyOffset_V0 + ValueSizeLength
/**
* The "magic" value
* When magic value is 0, the message uses absolute offset and does not have a timestamp field.
* When magic value is 1, the message uses relative offset and has a timestamp field.
*/
val MagicValue_V0: Byte = 0
val MagicValue_V1: Byte = 1
val CurrentMagicValue: Byte = 1
/**
* Specifies the mask for the compression code. 3 bits to hold the compression codec.
* 0 is reserved to indicate no compression
*/
val CompressionCodeMask: Int = 0x07
/**
* Specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
* 0 for CreateTime, 1 for LogAppendTime
*/
val TimestampTypeMask: Byte = 0x08
val TimestampTypeAttributeBitOffset: Int = 3
/**
* Compression code for uncompressed messages
*/
val NoCompression: Int = 0
/**
* To indicate timestamp is not defined so "magic" value 0 will be used.
*/
val NoTimestamp: Long = -1
/**
* Give the header size difference between different message versions.
*/
def headerSizeDiff(fromMagicValue: Byte, toMagicValue: Byte) : Int =
MessageHeaderSizeMap(toMagicValue) - MessageHeaderSizeMap(fromMagicValue)
def fromRecord(record: Record): Message = {
val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp == null) None else Some(record.wrapperRecordTimestamp)
val wrapperTimestampType = Option(record.wrapperRecordTimestampType)
new Message(record.buffer, wrapperTimestamp, wrapperTimestampType)
}
}
/**
* A message. The format of an N byte message is the following:
*
* 1. 4 byte CRC32 of the message
* 2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
* 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
* bit 0 ~ 2 : Compression codec.
* 0 : no compression
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : Timestamp type
* 0 : create time
* 1 : log append time
* bit 4 ~ 7 : reserved
* 4. (Optional) 8 byte timestamp only if "magic" identifier is greater than 0
* 5. 4 byte key length, containing length K
* 6. K byte key
* 7. 4 byte payload length, containing length V
* 8. V byte payload
*
* Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents.
* @param buffer the byte buffer of this message.
* @param wrapperMessageTimestamp the wrapper message timestamp, which is only defined when the message is an inner
* message of a compressed message.
* @param wrapperMessageTimestampType the wrapper message timestamp type, which is only defined when the message is an
* inner message of a compressed message.
*/
class Message(val buffer: ByteBuffer,
private val wrapperMessageTimestamp: Option[Long] = None,
private val wrapperMessageTimestampType: Option[TimestampType] = None) {
import kafka.message.Message._
private[message] def asRecord: Record = wrapperMessageTimestamp match {
case None => new Record(buffer)
case Some(timestamp) => new Record(buffer, timestamp, wrapperMessageTimestampType.orNull)
}
/**
* A constructor to create a Message
* @param bytes The payload of the message
* @param key The key of the message (null, if none)
* @param timestamp The timestamp of the message.
* @param timestampType The timestamp type of the message.
* @param codec The compression codec used on the contents of the message (if any)
* @param payloadOffset The offset into the payload array used to extract payload
* @param payloadSize The size of the payload to use
* @param magicValue the magic value to use
*/
def this(bytes: Array[Byte],
key: Array[Byte],
timestamp: Long,
timestampType: TimestampType,
codec: CompressionCodec,
payloadOffset: Int,
payloadSize: Int,
magicValue: Byte) = {
this(ByteBuffer.allocate(Message.CrcLength +
Message.MagicLength +
Message.AttributesLength +
(if (magicValue == Message.MagicValue_V0) 0
else Message.TimestampLength) +
Message.KeySizeLength +
(if(key == null) 0 else key.length) +
Message.ValueSizeLength +
(if(bytes == null) 0
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset)))
validateTimestampAndMagicValue(timestamp, magicValue)
// skip crc, we will fill that in at the end
buffer.position(MagicOffset)
buffer.put(magicValue)
val attributes: Byte =
if (codec.codec > 0)
timestampType.updateAttributes((CompressionCodeMask & codec.codec).toByte)
else 0
buffer.put(attributes)
// Only put timestamp when "magic" value is greater than 0
if (magic > MagicValue_V0)
buffer.putLong(timestamp)
if(key == null) {
buffer.putInt(-1)
} else {
buffer.putInt(key.length)
buffer.put(key, 0, key.length)
}
val size = if(bytes == null) -1
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset
buffer.putInt(size)
if(bytes != null)
buffer.put(bytes, payloadOffset, size)
buffer.rewind()
// now compute the checksum and fill it in
Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
}
def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
this(bytes = bytes, key = key, timestamp = timestamp, timestampType = TimestampType.CREATE_TIME, codec = codec, payloadOffset = 0, payloadSize = -1, magicValue = magicValue)
def this(bytes: Array[Byte], timestamp: Long, codec: CompressionCodec, magicValue: Byte) =
this(bytes = bytes, key = null, timestamp = timestamp, codec = codec, magicValue = magicValue)
def this(bytes: Array[Byte], key: Array[Byte], timestamp: Long, magicValue: Byte) =
this(bytes = bytes, key = key, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
def this(bytes: Array[Byte], timestamp: Long, magicValue: Byte) =
this(bytes = bytes, key = null, timestamp = timestamp, codec = NoCompressionCodec, magicValue = magicValue)
def this(bytes: Array[Byte]) =
this(bytes = bytes, key = null, timestamp = Message.NoTimestamp, codec = NoCompressionCodec, magicValue = Message.CurrentMagicValue)
/**
* Compute the checksum of the message from the message contents
*/
def computeChecksum: Long =
Utils.computeChecksum(buffer, MagicOffset, buffer.limit - MagicOffset)
/**
* Retrieve the previously computed CRC for this message
*/
def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
/**
* Returns true if the crc stored with the message matches the crc computed off the message contents
*/
def isValid: Boolean = checksum == computeChecksum
/**
* Throw an InvalidMessageException if isValid is false for this message
*/
def ensureValid() {
if(!isValid)
throw new InvalidMessageException(s"Message is corrupt (stored crc = ${checksum}, computed crc = ${computeChecksum})")
}
/**
* The complete serialized size of this message in bytes (including crc, header attributes, etc)
*/
def size: Int = buffer.limit
/**
* The position where the key size is stored.
*/
private def keySizeOffset = {
if (magic == MagicValue_V0) KeySizeOffset_V0
else KeySizeOffset_V1
}
/**
* The length of the key in bytes
*/
def keySize: Int = buffer.getInt(keySizeOffset)
/**
* Does the message have a key?
*/
def hasKey: Boolean = keySize >= 0
/**
* The position where the payload size is stored
*/
private def payloadSizeOffset = {
if (magic == MagicValue_V0) KeyOffset_V0 + max(0, keySize)
else KeyOffset_V1 + max(0, keySize)
}
/**
* The length of the message value in bytes
*/
def payloadSize: Int = buffer.getInt(payloadSizeOffset)
/**
* Is the payload of this message null
*/
def isNull: Boolean = payloadSize < 0
/**
* The magic version of this message
*/
def magic: Byte = buffer.get(MagicOffset)
/**
* The attributes stored with this message
*/
def attributes: Byte = buffer.get(AttributesOffset)
/**
* The timestamp of the message, only available when the "magic" value is greater than 0
* When magic > 0, The timestamp of a message is determined in the following way:
* 1. wrapperMessageTimestampType = None and wrapperMessageTimestamp is None - Uncompressed message, timestamp and timestamp type are in the message.
* 2. wrapperMessageTimestampType = LogAppendTime and wrapperMessageTimestamp is defined - Compressed message using LogAppendTime
* 3. wrapperMessageTimestampType = CreateTime and wrapperMessageTimestamp is defined - Compressed message using CreateTime
*/
def timestamp: Long = {
if (magic == MagicValue_V0)
Message.NoTimestamp
// Case 2
else if (wrapperMessageTimestampType.exists(_ == TimestampType.LOG_APPEND_TIME) && wrapperMessageTimestamp.isDefined)
wrapperMessageTimestamp.get
else // case 1, 3
buffer.getLong(Message.TimestampOffset)
}
/**
* The timestamp type of the message
*/
def timestampType = {
if (magic == MagicValue_V0)
TimestampType.NO_TIMESTAMP_TYPE
else
wrapperMessageTimestampType.getOrElse(TimestampType.forAttributes(attributes))
}
/**
* The compression codec used with this message
*/
def compressionCodec: CompressionCodec =
CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
/**
* A ByteBuffer containing the content of the message
*/
def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
/**
* A ByteBuffer containing the message key
*/
def key: ByteBuffer = sliceDelimited(keySizeOffset)
/**
* Read a size-delimited byte buffer starting at the given offset
*/
private def sliceDelimited(start: Int): ByteBuffer = {
val size = buffer.getInt(start)
if(size < 0) {
null
} else {
var b = buffer.duplicate()
b.position(start + 4)
b = b.slice()
b.limit(size)
b.rewind
b
}
}
/**
* Validate the timestamp and "magic" value
*/
private def validateTimestampAndMagicValue(timestamp: Long, magic: Byte) {
if (magic != MagicValue_V0 && magic != MagicValue_V1)
throw new IllegalArgumentException(s"Invalid magic value $magic")
if (timestamp < 0 && timestamp != NoTimestamp)
throw new IllegalArgumentException(s"Invalid message timestamp $timestamp")
if (magic == MagicValue_V0 && timestamp != NoTimestamp)
throw new IllegalArgumentException(s"Invalid timestamp $timestamp. Timestamp must be $NoTimestamp when magic = $MagicValue_V0")
}
override def toString: String = {
if (magic == MagicValue_V0)
s"Message(magic = $magic, attributes = $attributes, crc = $checksum, key = $key, payload = $payload)"
else
s"Message(magic = $magic, attributes = $attributes, $timestampType = $timestamp, crc = $checksum, key = $key, payload = $payload)"
}
override def equals(any: Any): Boolean = {
any match {
case that: Message => this.buffer.equals(that.buffer)
case _ => false
}
}
override def hashCode(): Int = buffer.hashCode
}