blob: 8c973a45f4168f80c5f62488334096aa5d32d807 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io._
import java.util.Properties
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
import kafka.api.ApiVersion
import kafka.common.LongRef
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.message._
import kafka.utils._
import kafka.server.KafkaConfig
import org.apache.kafka.common.utils.Utils
class LogTest extends JUnitSuite {
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val time = new MockTime(0)
var config: KafkaConfig = null
val logConfig = LogConfig()
@Before
def setUp() {
val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
config = KafkaConfig.fromProps(props)
}
@After
def tearDown() {
Utils.delete(tmpDir)
}
def createEmptyLogs(dir: File, offsets: Int*) {
for(offset <- offsets) {
Log.logFilename(dir, offset).createNewFile()
Log.indexFilename(dir, offset).createNewFile()
}
}
/**
* Tests for time based log roll. This test appends messages then changes the time
* using the mock clock to force the log to roll and checks the number of segments.
*/
@Test
def testTimeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes)
val logProps = new Properties()
logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long)
// create a log
val log = new Log(logDir,
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
time.sleep(log.config.segmentMs + 1)
log.append(set)
assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
log.append(set)
assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments)
for(numSegments <- 3 until 5) {
time.sleep(log.config.segmentMs + 1)
log.append(set)
assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
}
val numSegments = log.numberOfSegments
time.sleep(log.config.segmentMs + 1)
log.append(new ByteBufferMessageSet())
assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
}
/**
* Test for jitter s for time based log roll. This test appends messages then changes the time
* using the mock clock to force the log to roll and checks the number of segments.
*/
@Test
def testTimeBasedLogRollJitter() {
val set = TestUtils.singleMessageSet("test".getBytes)
val maxJitter = 20 * 60L
val logProps = new Properties()
logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long)
// create a log
val log = new Log(logDir,
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
log.append(set)
time.sleep(log.config.segmentMs - maxJitter)
log.append(set)
assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments)
time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
log.append(set)
assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
}
/**
* Test that appending more than the maximum segment size rolls the log
*/
@Test
def testSizeBasedLogRoll() {
val set = TestUtils.singleMessageSet("test".getBytes)
val setSize = set.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
// create a log
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
for (i<- 1 to (msgPerSeg + 1)) {
log.append(set)
}
assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
}
/**
* Test that we can open and append to an empty log
*/
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time)
log.append(TestUtils.singleMessageSet("test".getBytes))
}
/**
* This test case appends a bunch of messages and checks that we can read them all back using sequential offsets.
*/
@Test
def testAppendAndReadWithSequentialOffsets() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
for(i <- 0 until messages.length)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
for(i <- 0 until messages.length) {
val read = log.read(i, 100, Some(i+1)).messageSet.head
assertEquals("Offset read should match order appended.", i, read.offset)
assertEquals("Message should match appended.", messages(i), read.message)
}
assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size)
}
/**
* This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
* from any offset less than the logEndOffset including offsets not appended.
*/
@Test
def testAppendAndReadWithNonSequentialOffsets() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val messages = messageIds.map(id => new Message(id.toString.getBytes))
// now test the case that we give the offsets and use non-sequential offsets
for(i <- 0 until messages.length)
log.append(new ByteBufferMessageSet(NoCompressionCodec, new LongRef(messageIds(i)), messages = messages(i)), assignOffsets = false)
for(i <- 50 until messageIds.max) {
val idx = messageIds.indexWhere(_ >= i)
val read = log.read(i, 100, None).messageSet.head
assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
assertEquals("Message should match appended.", messages(idx), read.message)
}
}
/**
* This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
* Specifically we create a log where the last message in the first segment has offset 0. If we
* then read offset 1, we should expect this read to come from the second segment, even though the
* first segment has the greatest lower bound on the offset.
*/
@Test
def testReadAtLogGap() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
// now manually truncate off all but one message from the first segment to create a gap in the messages
log.logSegments.head.truncateTo(1)
assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset)
}
/**
* Test reading at the boundary of the log, specifically
* - reading from the logEndOffset should give an empty message set
* - reading from the maxOffset should give an empty message set
* - reading beyond the log end offset should throw an OffsetOutOfRangeException
*/
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
try {
log.read(0, 1025)
fail("Expected exception on invalid read.")
} catch {
case e: OffsetOutOfRangeException => "This is good."
}
try {
log.read(1026, 1000)
fail("Expected exception on invalid read.")
} catch {
case e: OffsetOutOfRangeException => // This is good.
}
assertEquals("Reading from maxOffset should produce 0 byte read.", 0, log.read(1024, 1000, Some(1024)).messageSet.sizeInBytes)
}
/**
* Test that covers reads and writes on a multisegment log. This test appends a bunch of messages
* and then reads them all back and checks that the message read and offset matches what was appended.
*/
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
messageSets.foreach(log.append(_))
log.flush
/* do successive reads to ensure all our messages are there */
var offset = 0L
for(i <- 0 until numMessages) {
val messages = log.read(offset, 1024*1024).messageSet
assertEquals("Offsets not equal", offset, messages.head.offset)
assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message,
messages.head.message.toFormatVersion(messageSets(i).head.message.magic))
offset = messages.head.offset + 1
}
val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
assertEquals("Should be no more messages", 0, lastRead.size)
// check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
TestUtils.retry(1000L){
assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset)
}
}
/**
* Test reads at offsets that fall within compressed message set boundaries.
*/
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head)
/* we should always get the first message in the compressed set when reading any offset in the set */
assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset)
assertEquals("Read at offset 1 should produce 0", 0, read(1).next().offset)
assertEquals("Read at offset 2 should produce 2", 2, read(2).next().offset)
assertEquals("Read at offset 3 should produce 2", 2, read(3).next().offset)
}
/**
* Test garbage collecting old segments
*/
@Test
def testThatGarbageCollectingSegmentsDoesntChangeOffset() {
for(messagesToAppend <- List(0, 1, 25)) {
logDir.mkdirs()
// first test a log segment starting at 0
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
for(i <- 0 until messagesToAppend)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
val currOffset = log.logEndOffset
assertEquals(currOffset, messagesToAppend)
// time goes by; the log file is deleted
log.deleteOldSegments(_ => true)
assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
assertEquals("We should still have one segment left", 1, log.numberOfSegments)
assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
currOffset,
log.append(TestUtils.singleMessageSet("hello".getBytes)).firstOffset)
// cleanup the log
log.delete()
}
}
/**
* MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by
* appending a message set larger than the config.segmentSize setting and checking that an exception is thrown.
*/
@Test
def testMessageSetSizeCheck() {
val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
// append messages to log
val configSegmentSize = messageSet.sizeInBytes - 1
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
try {
log.append(messageSet)
fail("message set should throw RecordBatchTooLargeException.")
} catch {
case e: RecordBatchTooLargeException => // this is good
}
}
@Test
def testCompactedTopicConstraints() {
val keyedMessage = new Message(bytes = "this message has a key".getBytes, key = "and here it is".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
val anotherKeyedMessage = new Message(bytes = "this message also has a key".getBytes, key ="another key".getBytes, Message.NoTimestamp, Message.CurrentMagicValue)
val unkeyedMessage = new Message(bytes = "this message does not have a key".getBytes)
val messageSetWithUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage, keyedMessage)
val messageSetWithOneUnkeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, unkeyedMessage)
val messageSetWithCompressedKeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage)
val messageSetWithCompressedUnkeyedMessage = new ByteBufferMessageSet(GZIPCompressionCodec, keyedMessage, unkeyedMessage)
val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage)
val logProps = new Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time)
try {
log.append(messageSetWithUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case e: CorruptRecordException => // this is good
}
try {
log.append(messageSetWithOneUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case e: CorruptRecordException => // this is good
}
try {
log.append(messageSetWithCompressedUnkeyedMessage)
fail("Compacted topics cannot accept a message without a key.")
} catch {
case e: CorruptRecordException => // this is good
}
// the following should succeed without any InvalidMessageException
log.append(messageSetWithKeyedMessage)
log.append(messageSetWithKeyedMessages)
log.append(messageSetWithCompressedKeyedMessage)
}
/**
* We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
* setting and checking that an exception is thrown.
*/
@Test
def testMessageSizeCheck() {
val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change (I need more bytes)".getBytes))
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
// should be able to append the small message
log.append(first)
try {
log.append(second)
fail("Second message set should throw MessageSizeTooLargeException.")
} catch {
case e: RecordTooLargeException => // this is good
}
}
/**
* Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly.
*/
@Test
def testLogRecoversToCorrectOffset() {
val numMessages = 100
val messageSize = 100
val segmentSize = 7 * messageSize
val indexInterval = 3 * messageSize
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
val lastIndexOffset = log.activeSegment.index.lastOffset
val numIndexEntries = log.activeSegment.index.entries
val lastOffset = log.logEndOffset
log.close()
log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
// test recovery case
log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
}
/**
* Test that if we manually delete an index segment it is rebuilt when the log is re-opened
*/
@Test
def testIndexRebuild() {
// publish the messages and close the log
val numMessages = 200
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
val indexFiles = log.logSegments.map(_.index.file)
log.close()
// delete all the index files
indexFiles.foreach(_.delete())
// reopen the log
log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
log.close()
}
/**
* Test that if we have corrupted an index segment it is rebuilt when the log is re-opened
*/
@Test
def testCorruptIndexRebuild() {
// publish the messages and close the log
val numMessages = 200
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
val indexFiles = log.logSegments.map(_.index.file)
log.close()
// corrupt all the index files
for( file <- indexFiles) {
val bw = new BufferedWriter(new FileWriter(file))
bw.write(" ")
bw.close()
}
// reopen the log
log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
log.close()
}
/**
* Test the Log truncate operations
*/
@Test
def testTruncateTo() {
val set = TestUtils.singleMessageSet("test".getBytes)
val setSize = set.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
// create a log
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset)
val lastOffset = log.logEndOffset
val size = log.size
log.truncateTo(log.logEndOffset) // keep the entire log
assertEquals("Should not change offset", lastOffset, log.logEndOffset)
assertEquals("Should not change log size", size, log.size)
log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset
assertEquals("Should not change offset but should log error", lastOffset, log.logEndOffset)
assertEquals("Should not change log size", size, log.size)
log.truncateTo(msgPerSeg/2) // truncate somewhere in between
assertEquals("Should change offset", log.logEndOffset, msgPerSeg/2)
assertTrue("Should change log size", log.size < size)
log.truncateTo(0) // truncate the entire log
assertEquals("Should change offset", 0, log.logEndOffset)
assertEquals("Should change log size", 0, log.size)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size)
log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1))
assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1))
assertEquals("Should change log size", log.size, 0)
for (i<- 1 to msgPerSeg)
log.append(set)
assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg)
assertEquals("log size should be same as before", size, log.size)
log.truncateTo(0) // truncate before first start offset in the log
assertEquals("Should change offset", 0, log.logEndOffset)
assertEquals("Should change log size", log.size, 0)
}
/**
* Verify that when we truncate a log the index of the last segment is resized to the max index size to allow more appends
*/
@Test
def testIndexResizingAtTruncation() {
val set = TestUtils.singleMessageSet("test".getBytes)
val setSize = set.sizeInBytes
val msgPerSeg = 10
val segmentSize = msgPerSeg * setSize // each segment will be 10 messages
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList(0).index.maxEntries)
log.truncateTo(0)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList(0).index.maxEntries)
for (i<- 1 to msgPerSeg)
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
}
/**
* When we open a log any index segments without an associated log segment should be deleted.
*/
@Test
def testBogusIndexSegmentsAreRemoved() {
val bogusIndex1 = Log.indexFilename(logDir, 0)
val bogusIndex2 = Log.indexFilename(logDir, 5)
val set = TestUtils.singleMessageSet("test".getBytes)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val log = new Log(logDir,
LogConfig(logProps),
recoveryPoint = 0L,
time.scheduler,
time)
assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
// check that we can append to the log
for(i <- 0 until 10)
log.append(set)
log.delete()
}
/**
* Verify that truncation works correctly after re-opening the log
*/
@Test
def testReopenThenTruncate() {
val set = TestUtils.singleMessageSet("test".getBytes)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
val config = LogConfig(logProps)
// create a log
var log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
// add enough messages to roll over several segments then close and re-open and attempt to truncate
for(i <- 0 until 100)
log.append(set)
log.close()
log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
log.truncateTo(3)
assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
}
/**
* Test that deleted files are deleted after the appropriate time.
*/
@Test
def testAsyncDelete() {
val set = TestUtils.singleMessageSet("test".getBytes)
val asyncDeleteMs = 1000
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
// append some messages to create some segments
for(i <- 0 until 100)
log.append(set)
// files should be renamed
val segments = log.logSegments.toArray
val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
log.deleteOldSegments((s) => true)
assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) &&
segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix)))
assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) &&
segments.forall(_.index.file.exists))
assertTrue("The original file should be gone.", oldFiles.forall(!_.exists))
// when enough time passes the files should be deleted
val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
time.sleep(asyncDeleteMs + 1)
assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists))
}
/**
* Any files ending in .deleted should be removed when the log is re-opened.
*/
@Test
def testOpenDeletesObsoleteFiles() {
val set = TestUtils.singleMessageSet("test".getBytes)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
// append some messages to create some segments
for(i <- 0 until 100)
log.append(set)
log.deleteOldSegments((s) => true)
log.close()
log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
}
@Test
def testAppendMessageWithNullPayload() {
val log = new Log(logDir,
LogConfig(),
recoveryPoint = 0L,
time.scheduler,
time)
log.append(new ByteBufferMessageSet(new Message(bytes = null)))
val messageSet = log.read(0, 4096, None).messageSet
assertEquals(0, messageSet.head.offset)
assertTrue("Message payload should be null.", messageSet.head.message.isNull)
}
@Test(expected = classOf[IllegalArgumentException])
def testAppendWithOutOfOrderOffsetsThrowsException() {
val log = new Log(logDir,
LogConfig(),
recoveryPoint = 0L,
time.scheduler,
time)
val messages = (0 until 2).map(id => new Message(id.toString.getBytes)).toArray
messages.foreach(message => log.append(new ByteBufferMessageSet(message)))
val invalidMessage = new ByteBufferMessageSet(new Message(1.toString.getBytes))
log.append(invalidMessage, assignOffsets = false)
}
@Test
def testCorruptLog() {
// append some messages to create some segments
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
val config = LogConfig(logProps)
val set = TestUtils.singleMessageSet("test".getBytes)
val recoveryPoint = 50L
for(iteration <- 0 until 50) {
// create a log and write some messages to it
logDir.mkdirs()
var log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
val numMessages = 50 + TestUtils.random.nextInt(50)
for(i <- 0 until numMessages)
log.append(set)
val messages = log.logSegments.flatMap(_.log.iterator.toList)
log.close()
// corrupt index and log by appending random bytes
TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1)
TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
// attempt recovery
log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
assertEquals(numMessages, log.logEndOffset)
assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
Utils.delete(logDir)
}
}
@Test
def testCleanShutdownFile() {
// append some messages to create some segments
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
val set = TestUtils.singleMessageSet("test".getBytes)
val parentLogDir = logDir.getParentFile
assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)
cleanShutdownFile.createNewFile()
assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
var recoveryPoint = 0L
// create a log and write some messages to it
var log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
for(i <- 0 until 100)
log.append(set)
log.close()
// check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
// clean shutdown file exists.
recoveryPoint = log.logEndOffset
log = new Log(logDir, config, 0L, time.scheduler, time)
assertEquals(recoveryPoint, log.logEndOffset)
cleanShutdownFile.delete()
}
@Test
def testParseTopicPartitionName() {
val topic = "test_topic"
val partition = "143"
val dir = new File(logDir + topicPartitionName(topic, partition))
val topicAndPartition = Log.parseTopicPartitionName(dir)
assertEquals(topic, topicAndPartition.asTuple._1)
assertEquals(partition.toInt, topicAndPartition.asTuple._2)
}
@Test
def testParseTopicPartitionNameForEmptyName() {
try {
val dir = new File("")
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForNull() {
try {
val dir: File = null
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForMissingSeparator() {
val topic = "test_topic"
val partition = "1999"
val dir = new File(logDir + File.separator + topic + partition)
try {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForMissingTopic() {
val topic = ""
val partition = "1999"
val dir = new File(logDir + topicPartitionName(topic, partition))
try {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
@Test
def testParseTopicPartitionNameForMissingPartition() {
val topic = "test_topic"
val partition = ""
val dir = new File(logDir + topicPartitionName(topic, partition))
try {
Log.parseTopicPartitionName(dir)
fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
} catch {
case e: Exception => // its GOOD!
}
}
def topicPartitionName(topic: String, partition: String): String =
File.separator + topic + "-" + partition
}