blob: a683963c9f1c5f8a7dda8e3fc6dcb394ba567d85 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.message
import java.nio._
import junit.framework.Assert._
import kafka.utils.TestUtils._
import org.junit.Test
class FileMessageSetTest extends BaseMessageSetTestCases {
val messageSet = createMessageSet(messages)
def createMessageSet(messages: Seq[Message]): FileMessageSet = {
val set = new FileMessageSet(tempFile(), true)
set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
set.flush()
set
}
@Test
def testFileSize() {
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
messageSet.append(singleMessageSet("abcd".getBytes()))
assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
}
@Test
def testIterationOverPartialAndTruncation() {
testPartialWrite(0, messageSet)
testPartialWrite(2, messageSet)
testPartialWrite(4, messageSet)
testPartialWrite(5, messageSet)
testPartialWrite(6, messageSet)
}
def testPartialWrite(size: Int, messageSet: FileMessageSet) {
val buffer = ByteBuffer.allocate(size)
val originalPosition = messageSet.channel.position
for(i <- 0 until size)
buffer.put(0.asInstanceOf[Byte])
buffer.rewind()
messageSet.channel.write(buffer)
// appending those bytes should not change the contents
checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
assertEquals("Unexpected number of bytes truncated", size.longValue, messageSet.recover())
assertEquals("File pointer should now be at the end of the file.", originalPosition, messageSet.channel.position)
// nor should recovery change the contents
checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
}
@Test
def testIterationDoesntChangePosition() {
val position = messageSet.channel.position
checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
assertEquals(position, messageSet.channel.position)
}
@Test
def testRead() {
val read = messageSet.read(0, messageSet.sizeInBytes)
checkEquals(messageSet.iterator, read.iterator)
val items = read.iterator.toList
val first = items.head
val read2 = messageSet.read(first.offset, messageSet.sizeInBytes)
checkEquals(items.tail.iterator, read2.iterator)
}
}