| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.message |
| |
| import kafka.utils.{IteratorTemplate, Logging} |
| import kafka.common.{KafkaException, LongRef} |
| import java.nio.ByteBuffer |
| import java.nio.channels._ |
| import java.io._ |
| import java.util.ArrayDeque |
| import java.util.concurrent.atomic.AtomicLong |
| |
| import scala.collection.JavaConverters._ |
| |
| import org.apache.kafka.common.errors.InvalidTimestampException |
| import org.apache.kafka.common.record.TimestampType |
| import org.apache.kafka.common.utils.Utils |
| |
| import scala.collection.mutable |
| |
| object ByteBufferMessageSet { |
| |
| private def create(offsetAssigner: OffsetAssigner, |
| compressionCodec: CompressionCodec, |
| wrapperMessageTimestamp: Option[Long], |
| timestampType: TimestampType, |
| messages: Message*): ByteBuffer = { |
| if (messages.isEmpty) |
| MessageSet.Empty.buffer |
| else if (compressionCodec == NoCompressionCodec) { |
| val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages)) |
| for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset()) |
| buffer.rewind() |
| buffer |
| } else { |
| val magicAndTimestamp = wrapperMessageTimestamp match { |
| case Some(ts) => MagicAndTimestamp(messages.head.magic, ts) |
| case None => MessageSet.magicAndLargestTimestamp(messages) |
| } |
| var offset = -1L |
| val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16)) |
| messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream => |
| val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) |
| try { |
| for (message <- messages) { |
| offset = offsetAssigner.nextAbsoluteOffset() |
| if (message.magic != magicAndTimestamp.magic) |
| throw new IllegalArgumentException("Messages in the message set must have same magic value") |
| // Use inner offset if magic value is greater than 0 |
| if (magicAndTimestamp.magic > Message.MagicValue_V0) |
| output.writeLong(offsetAssigner.toInnerOffset(offset)) |
| else |
| output.writeLong(offset) |
| output.writeInt(message.size) |
| output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) |
| } |
| } finally { |
| output.close() |
| } |
| } |
| val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead) |
| writeMessage(buffer, messageWriter, offset) |
| buffer.rewind() |
| buffer |
| } |
| } |
| |
| /** Deep iterator that decompresses the message sets and adjusts timestamp and offset if needed. */ |
| def deepIterator(wrapperMessageAndOffset: MessageAndOffset): Iterator[MessageAndOffset] = { |
| |
| import Message._ |
| |
| new IteratorTemplate[MessageAndOffset] { |
| |
| val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset |
| val wrapperMessageTimestampOpt: Option[Long] = |
| if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None |
| val wrapperMessageTimestampTypeOpt: Option[TimestampType] = |
| if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None |
| if (wrapperMessage.payload == null) |
| throw new KafkaException(s"Message payload is null: $wrapperMessage") |
| val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) |
| val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream)) |
| var lastInnerOffset = -1L |
| |
| val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) { |
| val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]() |
| try { |
| while (true) |
| innerMessageAndOffsets.add(readMessageFromStream()) |
| } catch { |
| case eofe: EOFException => |
| compressed.close() |
| case ioe: IOException => |
| throw new KafkaException(ioe) |
| } |
| Some(innerMessageAndOffsets) |
| } else None |
| |
| private def readMessageFromStream(): MessageAndOffset = { |
| val innerOffset = compressed.readLong() |
| val recordSize = compressed.readInt() |
| |
| if (recordSize < MinMessageOverhead) |
| throw new InvalidMessageException(s"Message found with corrupt size `$recordSize` in deep iterator") |
| |
| // read the record into an intermediate record buffer (i.e. extra copy needed) |
| val bufferArray = new Array[Byte](recordSize) |
| compressed.readFully(bufferArray, 0, recordSize) |
| val buffer = ByteBuffer.wrap(bufferArray) |
| |
| // Override the timestamp if necessary |
| val newMessage = new Message(buffer, wrapperMessageTimestampOpt, wrapperMessageTimestampTypeOpt) |
| |
| // Inner message and wrapper message must have same magic value |
| if (newMessage.magic != wrapperMessage.magic) |
| throw new IllegalStateException(s"Compressed message has magic value ${wrapperMessage.magic} " + |
| s"but inner message has magic value ${newMessage.magic}") |
| lastInnerOffset = innerOffset |
| new MessageAndOffset(newMessage, innerOffset) |
| } |
| |
| override def makeNext(): MessageAndOffset = { |
| messageAndOffsets match { |
| // Using inner offset and timestamps |
| case Some(innerMessageAndOffsets) => |
| innerMessageAndOffsets.pollFirst() match { |
| case null => allDone() |
| case MessageAndOffset(message, offset) => |
| val relativeOffset = offset - lastInnerOffset |
| val absoluteOffset = wrapperMessageOffset + relativeOffset |
| new MessageAndOffset(message, absoluteOffset) |
| } |
| // Not using inner offset and timestamps |
| case None => |
| try readMessageFromStream() |
| catch { |
| case eofe: EOFException => |
| compressed.close() |
| allDone() |
| case ioe: IOException => |
| throw new KafkaException(ioe) |
| } |
| } |
| } |
| } |
| } |
| |
| private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { |
| buffer.putLong(offset) |
| buffer.putInt(message.size) |
| buffer.put(message.buffer) |
| message.buffer.rewind() |
| } |
| |
| private[kafka] def writeMessage(buffer: ByteBuffer, messageWriter: MessageWriter, offset: Long) { |
| buffer.putLong(offset) |
| buffer.putInt(messageWriter.size) |
| messageWriter.writeTo(buffer) |
| } |
| } |
| |
| private object OffsetAssigner { |
| |
| def apply(offsetCounter: LongRef, size: Int): OffsetAssigner = |
| new OffsetAssigner(offsetCounter.value to offsetCounter.addAndGet(size)) |
| |
| } |
| |
| private class OffsetAssigner(offsets: Seq[Long]) { |
| private var index = 0 |
| |
| def nextAbsoluteOffset(): Long = { |
| val result = offsets(index) |
| index += 1 |
| result |
| } |
| |
| def toInnerOffset(offset: Long): Long = offset - offsets.head |
| |
| } |
| |
| /** |
| * A sequence of messages stored in a byte buffer |
| * |
| * There are two ways to create a ByteBufferMessageSet |
| * |
| * Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method. |
| * |
| * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method. |
| * |
| * |
| * Message format v1 has the following changes: |
| * - For non-compressed messages, timestamp and timestamp type attributes have been added. The offsets of |
| * the messages remain absolute offsets. |
| * - For compressed messages, timestamp and timestamp type attributes have been added and inner offsets (IO) are used |
| * for inner messages of compressed messages (see offset calculation details below). The timestamp type |
| * attribute is only set in wrapper messages. Inner messages always have CreateTime as the timestamp type in attributes. |
| * |
| * We set the timestamp in the following way: |
| * For non-compressed messages: the timestamp and timestamp type message attributes are set and used. |
| * For compressed messages: |
| * 1. Wrapper messages' timestamp type attribute is set to the proper value |
| * 2. Wrapper messages' timestamp is set to: |
| * - the max timestamp of inner messages if CreateTime is used |
| * - the current server time if wrapper message's timestamp = LogAppendTime. |
| * In this case the wrapper message timestamp is used and all the timestamps of inner messages are ignored. |
| * 3. Inner messages' timestamp will be: |
| * - used when wrapper message's timestamp type is CreateTime |
| * - ignored when wrapper message's timestamp type is LogAppendTime |
| * 4. Inner messages' timestamp type will always be ignored with one exception: producers must set the inner message |
| * timestamp type to CreateTime, otherwise the messages will be rejected by broker. |
| * |
| * Absolute offsets are calculated in the following way: |
| * Ideally the conversion from relative offset(RO) to absolute offset(AO) should be: |
| * |
| * AO = AO_Of_Last_Inner_Message + RO |
| * |
| * However, note that the message sets sent by producers are compressed in a streaming way. |
| * And the relative offset of an inner message compared with the last inner message is not known until |
| * the last inner message is written. |
| * Unfortunately we are not able to change the previously written messages after the last message is written to |
| * the message set when stream compression is used. |
| * |
| * To solve this issue, we use the following solution: |
| * |
| * 1. When the producer creates a message set, it simply writes all the messages into a compressed message set with |
| * offset 0, 1, ... (inner offset). |
| * 2. The broker will set the offset of the wrapper message to the absolute offset of the last message in the |
| * message set. |
| * 3. When a consumer sees the message set, it first decompresses the entire message set to find out the inner |
| * offset (IO) of the last inner message. Then it computes RO and AO of previous messages: |
| * |
| * RO = IO_of_a_message - IO_of_the_last_message |
| * AO = AO_Of_Last_Inner_Message + RO |
| * |
| * 4. This solution works for compacted message sets as well. |
| * |
| */ |
| class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging { |
| private var shallowValidByteCount = -1 |
| |
| private[kafka] def this(compressionCodec: CompressionCodec, |
| offsetCounter: LongRef, |
| wrapperMessageTimestamp: Option[Long], |
| timestampType: TimestampType, |
| messages: Message*) { |
| this(ByteBufferMessageSet.create(OffsetAssigner(offsetCounter, messages.size), compressionCodec, |
| wrapperMessageTimestamp, timestampType, messages:_*)) |
| } |
| |
| def this(compressionCodec: CompressionCodec, offsetCounter: LongRef, messages: Message*) { |
| this(compressionCodec, offsetCounter, None, TimestampType.CREATE_TIME, messages:_*) |
| } |
| |
| def this(compressionCodec: CompressionCodec, offsetSeq: Seq[Long], messages: Message*) { |
| this(ByteBufferMessageSet.create(new OffsetAssigner(offsetSeq), compressionCodec, |
| None, TimestampType.CREATE_TIME, messages:_*)) |
| } |
| |
| def this(compressionCodec: CompressionCodec, messages: Message*) { |
| this(compressionCodec, new LongRef(0L), messages: _*) |
| } |
| |
| def this(messages: Message*) { |
| this(NoCompressionCodec, messages: _*) |
| } |
| |
| def getBuffer = buffer |
| |
| private def shallowValidBytes: Int = { |
| if (shallowValidByteCount < 0) { |
| this.shallowValidByteCount = this.internalIterator(isShallow = true).map { messageAndOffset => |
| MessageSet.entrySize(messageAndOffset.message) |
| }.sum |
| } |
| shallowValidByteCount |
| } |
| |
| /** Write the messages in this set to the given channel */ |
| def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int = { |
| // Ignore offset and size from input. We just want to write the whole buffer to the channel. |
| buffer.mark() |
| var written = 0 |
| while(written < sizeInBytes) |
| written += channel.write(buffer) |
| buffer.reset() |
| written |
| } |
| |
| override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { |
| for (messageAndOffset <- shallowIterator) { |
| if (messageAndOffset.message.magic != expectedMagicValue) |
| return false |
| } |
| true |
| } |
| |
| /** default iterator that iterates over decompressed messages */ |
| override def iterator: Iterator[MessageAndOffset] = internalIterator() |
| |
| /** iterator over compressed messages without decompressing */ |
| def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true) |
| |
| /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/ |
| private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { |
| new IteratorTemplate[MessageAndOffset] { |
| var topIter = buffer.slice() |
| var innerIter: Iterator[MessageAndOffset] = null |
| |
| def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext) |
| |
| def makeNextOuter: MessageAndOffset = { |
| // if there isn't at least an offset and size, we are done |
| if (topIter.remaining < 12) |
| return allDone() |
| val offset = topIter.getLong() |
| val size = topIter.getInt() |
| if(size < Message.MinMessageOverhead) |
| throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator") |
| |
| // we have an incomplete message |
| if(topIter.remaining < size) |
| return allDone() |
| |
| // read the current message and check correctness |
| val message = topIter.slice() |
| message.limit(size) |
| topIter.position(topIter.position + size) |
| val newMessage = new Message(message) |
| if(isShallow) { |
| new MessageAndOffset(newMessage, offset) |
| } else { |
| newMessage.compressionCodec match { |
| case NoCompressionCodec => |
| innerIter = null |
| new MessageAndOffset(newMessage, offset) |
| case _ => |
| innerIter = ByteBufferMessageSet.deepIterator(new MessageAndOffset(newMessage, offset)) |
| if(!innerIter.hasNext) |
| innerIter = null |
| makeNext() |
| } |
| } |
| } |
| |
| override def makeNext(): MessageAndOffset = { |
| if(isShallow){ |
| makeNextOuter |
| } else { |
| if(innerDone()) |
| makeNextOuter |
| else |
| innerIter.next() |
| } |
| } |
| |
| } |
| } |
| |
| /** |
| * Update the offsets for this message set and do further validation on messages including: |
| * 1. Messages for compacted topics must have keys |
| * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets |
| * starting from 0. |
| * 3. When magic value = 1, validate and maybe overwrite timestamps of messages. |
| * |
| * This method will convert the messages in the following scenarios: |
| * A. Magic value of a message = 0 and messageFormatVersion is 1 |
| * B. Magic value of a message = 1 and messageFormatVersion is 0 |
| * |
| * If no format conversion or value overwriting is required for messages, this method will perform in-place |
| * operations and avoid re-compression. |
| * |
| * Returns the message set and a boolean indicating whether the message sizes may have changed. |
| */ |
| private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef, |
| now: Long, |
| sourceCodec: CompressionCodec, |
| targetCodec: CompressionCodec, |
| compactedTopic: Boolean = false, |
| messageFormatVersion: Byte = Message.CurrentMagicValue, |
| messageTimestampType: TimestampType, |
| messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = { |
| if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { |
| // check the magic value |
| if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) { |
| // Message format conversion |
| (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, |
| messageFormatVersion), true) |
| } else { |
| // Do in-place validation, offset assignment and maybe set timestamp |
| (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType, |
| messageTimestampDiffMaxMs), false) |
| } |
| } else { |
| // Deal with compressed messages |
| // We cannot do in place assignment in one of the following situations: |
| // 1. Source and target compression codec are different |
| // 2. When magic value to use is 0 because offsets need to be overwritten |
| // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. |
| // 4. Message format conversion is needed. |
| |
| // No in place assignment situation 1 and 2 |
| var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0 |
| |
| var maxTimestamp = Message.NoTimestamp |
| val expectedInnerOffset = new LongRef(0) |
| val validatedMessages = new mutable.ArrayBuffer[Message] |
| this.internalIterator(isShallow = false).foreach { messageAndOffset => |
| val message = messageAndOffset.message |
| validateMessageKey(message, compactedTopic) |
| |
| if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) { |
| // No in place assignment situation 3 |
| // Validate the timestamp |
| validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs) |
| // Check if we need to overwrite offset |
| if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement()) |
| inPlaceAssignment = false |
| maxTimestamp = math.max(maxTimestamp, message.timestamp) |
| } |
| |
| if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec) |
| throw new InvalidMessageException("Compressed outer message should not have an inner message with a " + |
| s"compression attribute set: $message") |
| |
| // No in place assignment situation 4 |
| if (message.magic != messageFormatVersion) |
| inPlaceAssignment = false |
| |
| validatedMessages += message.toFormatVersion(messageFormatVersion) |
| } |
| |
| if (!inPlaceAssignment) { |
| // Cannot do in place assignment. |
| val wrapperMessageTimestamp = { |
| if (messageFormatVersion == Message.MagicValue_V0) |
| Some(Message.NoTimestamp) |
| else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME) |
| Some(maxTimestamp) |
| else // Log append time |
| Some(now) |
| } |
| |
| (new ByteBufferMessageSet(compressionCodec = targetCodec, |
| offsetCounter = offsetCounter, |
| wrapperMessageTimestamp = wrapperMessageTimestamp, |
| timestampType = messageTimestampType, |
| messages = validatedMessages: _*), true) |
| } else { |
| // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message. |
| buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1) |
| // validate the messages |
| validatedMessages.foreach(_.ensureValid()) |
| |
| var crcUpdateNeeded = true |
| val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset |
| val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset |
| val timestamp = buffer.getLong(timestampOffset) |
| val attributes = buffer.get(attributeOffset) |
| if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp) |
| // We don't need to recompute crc if the timestamp is not updated. |
| crcUpdateNeeded = false |
| else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) { |
| // Set timestamp type and timestamp |
| buffer.putLong(timestampOffset, now) |
| buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes)) |
| } |
| |
| if (crcUpdateNeeded) { |
| // need to recompute the crc value |
| buffer.position(MessageSet.LogOverhead) |
| val wrapperMessage = new Message(buffer.slice()) |
| Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum) |
| } |
| buffer.rewind() |
| (this, false) |
| } |
| } |
| } |
| |
| // We create this method to avoid a memory copy. It reads from the original message set and directly |
| // writes the converted messages into new message set buffer. Hence we don't need to allocate memory for each |
| // individual message during message format conversion. |
| private def convertNonCompressedMessages(offsetCounter: LongRef, |
| compactedTopic: Boolean, |
| now: Long, |
| timestampType: TimestampType, |
| messageTimestampDiffMaxMs: Long, |
| toMagicValue: Byte): ByteBufferMessageSet = { |
| val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset => |
| Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue) |
| }.sum |
| val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) |
| var newMessagePosition = 0 |
| this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) => |
| validateMessageKey(message, compactedTopic) |
| validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs) |
| newBuffer.position(newMessagePosition) |
| newBuffer.putLong(offsetCounter.getAndIncrement()) |
| val newMessageSize = message.size + Message.headerSizeDiff(message.magic, toMagicValue) |
| newBuffer.putInt(newMessageSize) |
| val newMessageBuffer = newBuffer.slice() |
| newMessageBuffer.limit(newMessageSize) |
| message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType) |
| |
| newMessagePosition += MessageSet.LogOverhead + newMessageSize |
| } |
| newBuffer.rewind() |
| new ByteBufferMessageSet(newBuffer) |
| } |
| |
| private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef, |
| now: Long, |
| compactedTopic: Boolean, |
| timestampType: TimestampType, |
| timestampDiffMaxMs: Long): ByteBufferMessageSet = { |
| // do in-place validation and offset assignment |
| var messagePosition = 0 |
| buffer.mark() |
| while (messagePosition < sizeInBytes - MessageSet.LogOverhead) { |
| buffer.position(messagePosition) |
| buffer.putLong(offsetCounter.getAndIncrement()) |
| val messageSize = buffer.getInt() |
| val messageBuffer = buffer.slice() |
| messageBuffer.limit(messageSize) |
| val message = new Message(messageBuffer) |
| validateMessageKey(message, compactedTopic) |
| if (message.magic > Message.MagicValue_V0) { |
| validateTimestamp(message, now, timestampType, timestampDiffMaxMs) |
| if (timestampType == TimestampType.LOG_APPEND_TIME) { |
| message.buffer.putLong(Message.TimestampOffset, now) |
| message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes)) |
| Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum) |
| } |
| } |
| messagePosition += MessageSet.LogOverhead + messageSize |
| } |
| buffer.reset() |
| this |
| } |
| |
| private def validateMessageKey(message: Message, compactedTopic: Boolean) { |
| if (compactedTopic && !message.hasKey) |
| throw new InvalidMessageException("Compacted topic cannot accept message without key.") |
| } |
| |
| /** |
| * This method validates the timestamps of a message. |
| * If the message is using create time, this method checks if it is within acceptable range. |
| */ |
| private def validateTimestamp(message: Message, |
| now: Long, |
| timestampType: TimestampType, |
| timestampDiffMaxMs: Long) { |
| if (timestampType == TimestampType.CREATE_TIME && math.abs(message.timestamp - now) > timestampDiffMaxMs) |
| throw new InvalidTimestampException(s"Timestamp ${message.timestamp} of message is out of range. " + |
| s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}") |
| if (message.timestampType == TimestampType.LOG_APPEND_TIME) |
| throw new InvalidTimestampException(s"Invalid timestamp type in message $message. Producer should not set " + |
| s"timestamp type to LogAppendTime.") |
| } |
| |
| /** |
| * The total number of bytes in this message set, including any partial trailing messages |
| */ |
| def sizeInBytes: Int = buffer.limit |
| |
| /** |
| * The total number of bytes in this message set not including any partial, trailing messages |
| */ |
| def validBytes: Int = shallowValidBytes |
| |
| /** |
| * Two message sets are equal if their respective byte buffers are equal |
| */ |
| override def equals(other: Any): Boolean = { |
| other match { |
| case that: ByteBufferMessageSet => |
| buffer.equals(that.buffer) |
| case _ => false |
| } |
| } |
| |
| override def hashCode: Int = buffer.hashCode |
| |
| } |