blob: c81c35675e2176192a099bc347363b9ffca98929 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.message
import java.nio._
import junit.framework.Assert._
import org.junit.Test
import kafka.utils.TestUtils
import kafka.common.InvalidMessageSizeException
class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet =
new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
@Test
def testSmallFetchSize() {
// create a ByteBufferMessageSet that doesn't contain a full message
// iterating it should get an InvalidMessageSizeException
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
val buffer = messages.serialized.slice
buffer.limit(10)
val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
try {
for (message <- messageSetWithNoFullMessage)
fail("shouldn't see any message")
}
catch {
case e: InvalidMessageSizeException => //this is expected
case e2 => fail("shouldn't see any other exceptions")
}
}
@Test
def testValidBytes() {
{
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
buffer.put(messages.serialized)
buffer.putShort(4)
val messagesPlus = new ByteBufferMessageSet(buffer)
assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
}
// test valid bytes on empty ByteBufferMessageSet
{
assertEquals("Valid bytes on an empty ByteBufferMessageSet should return 0", 0,
MessageSet.Empty.asInstanceOf[ByteBufferMessageSet].validBytes)
}
}
@Test
def testEquals() {
var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
assertTrue(messages.equals(moreMessages))
messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
moreMessages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
assertTrue(messages.equals(moreMessages))
}
@Test
def testIterator() {
val messageList = List(
new Message("msg1".getBytes),
new Message("msg2".getBytes),
new Message("msg3".getBytes)
)
// test for uncompressed regular messages
{
val messageSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
//make sure shallow iterator is the same as deep iterator
TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
TestUtils.getMessageIterator(messageSet.iterator))
}
// test for compressed regular messages
{
val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
verifyShallowIterator(messageSet)
}
// test for mixed empty and non-empty messagesets uncompressed
{
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
buffer.put(emptyMessageSet.serialized)
buffer.put(regularMessgeSet.serialized)
buffer.rewind
val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
//make sure shallow iterator is the same as deep iterator
TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
TestUtils.getMessageIterator(mixedMessageSet.iterator))
}
// test for mixed empty and non-empty messagesets compressed
{
val emptyMessageList : List[Message] = Nil
val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
buffer.put(emptyMessageSet.serialized)
buffer.put(regularMessgeSet.serialized)
buffer.rewind
val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
verifyShallowIterator(mixedMessageSet)
}
}
def verifyShallowIterator(messageSet: ByteBufferMessageSet) {
//make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator
val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
assertTrue(shallowOffsets.subsetOf(deepOffsets))
}
}