blob: 5afd6e113232726a6e2a8cac478de9c1c08ef45e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.message
import kafka.utils.Logging
import java.nio.ByteBuffer
import java.nio.channels._
import kafka.utils.IteratorTemplate
import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
/**
* A sequence of messages stored in a byte buffer
*
* There are two ways to create a ByteBufferMessageSet
*
* Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
*
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
*
*/
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
private var shallowValidByteCount = -1L
if(sizeInBytes > Int.MaxValue)
throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
def this(compressionCodec: CompressionCodec, messages: Message*) {
this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
}
def this(messages: Message*) {
this(NoCompressionCodec, messages: _*)
}
def getInitialOffset = initialOffset
def getBuffer = buffer
def getErrorCode = errorCode
def serialized(): ByteBuffer = buffer
def validBytes: Long = shallowValidBytes
private def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
val iter = this.internalIterator(true)
while(iter.hasNext) {
val messageAndOffset = iter.next
shallowValidByteCount = messageAndOffset.offset
}
}
if(shallowValidByteCount < initialOffset) 0
else (shallowValidByteCount - initialOffset)
}
/** Write the messages in this set to the given channel */
def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = {
buffer.mark()
val written = channel.write(buffer)
buffer.reset()
written
}
/** default iterator that iterates over decompressed messages */
override def iterator: Iterator[MessageAndOffset] = internalIterator()
/** iterator over compressed messages without decompressing */
def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
def verifyMessageSize(maxMessageSize: Int){
var shallowIter = internalIterator(true)
while(shallowIter.hasNext){
var messageAndOffset = shallowIter.next
val payloadSize = messageAndOffset.message.payloadSize
if ( payloadSize > maxMessageSize)
throw new MessageSizeTooLargeException("payload size of " + payloadSize + " larger than " + maxMessageSize)
}
}
/** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageAndOffset] {
var topIter = buffer.slice()
var currValidBytes = initialOffset
var innerIter:Iterator[MessageAndOffset] = null
var lastMessageSize = 0L
def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
def makeNextOuter: MessageAndOffset = {
if (topIter.remaining < 4) {
return allDone()
}
val size = topIter.getInt()
lastMessageSize = size
trace("Remaining bytes in iterator = " + topIter.remaining)
trace("size of data = " + size)
if(size < 0 || topIter.remaining < size) {
if (currValidBytes == initialOffset || size < 0)
throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +
"the fetch size; (2) log corruption )")
return allDone()
}
val message = topIter.slice()
message.limit(size)
topIter.position(topIter.position + size)
val newMessage = new Message(message)
if(!newMessage.isValid)
throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec
+ " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset)
if(isShallow){
currValidBytes += 4 + size
trace("shallow iterator currValidBytes = " + currValidBytes)
new MessageAndOffset(newMessage, currValidBytes)
}
else{
newMessage.compressionCodec match {
case NoCompressionCodec =>
debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
currValidBytes += 4 + size
trace("currValidBytes = " + currValidBytes)
new MessageAndOffset(newMessage, currValidBytes)
case _ =>
debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
innerIter = CompressionUtils.decompress(newMessage).internalIterator()
if (!innerIter.hasNext) {
currValidBytes += 4 + lastMessageSize
innerIter = null
}
makeNext()
}
}
}
override def makeNext(): MessageAndOffset = {
if(isShallow){
makeNextOuter
}
else{
val isInnerDone = innerDone()
debug("makeNext() in internalIterator: innerDone = " + isInnerDone)
isInnerDone match {
case true => makeNextOuter
case false => {
val messageAndOffset = innerIter.next
if (!innerIter.hasNext)
currValidBytes += 4 + lastMessageSize
new MessageAndOffset(messageAndOffset.message, currValidBytes)
}
}
}
}
}
}
def sizeInBytes: Long = buffer.limit
override def toString: String = {
val builder = new StringBuilder()
builder.append("ByteBufferMessageSet(")
for(message <- this) {
builder.append(message)
builder.append(", ")
}
builder.append(")")
builder.toString
}
override def equals(other: Any): Boolean = {
other match {
case that: ByteBufferMessageSet =>
(that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
case _ => false
}
}
override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
}