| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.log |
| |
| import java.io._ |
| import java.nio.ByteBuffer |
| import java.nio.file.{Files, Paths} |
| import java.util.regex.Pattern |
| import java.util.{Collections, Optional, Properties} |
| |
| import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} |
| import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} |
| import kafka.log.Log.DeleteDirSuffix |
| import kafka.server.checkpoints.LeaderEpochCheckpointFile |
| import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} |
| import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} |
| import kafka.utils._ |
| import org.apache.kafka.common.{KafkaException, TopicPartition} |
| import org.apache.kafka.common.errors._ |
| import org.apache.kafka.common.record.FileRecords.TimestampAndOffset |
| import org.apache.kafka.common.record.MemoryRecords.RecordFilter |
| import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention |
| import org.apache.kafka.common.record._ |
| import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction |
| import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse} |
| import org.apache.kafka.common.utils.{Time, Utils} |
| import org.easymock.EasyMock |
| import org.junit.Assert._ |
| import org.junit.{After, Before, Test} |
| import org.scalatest.Assertions |
| |
| import scala.collection.{Iterable, mutable} |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.ListBuffer |
| import org.scalatest.Assertions.{assertThrows, intercept, withClue} |
| |
| class LogTest { |
| var config: KafkaConfig = null |
| val brokerTopicStats = new BrokerTopicStats |
| val tmpDir = TestUtils.tempDir() |
| val logDir = TestUtils.randomPartitionLogDir(tmpDir) |
| val mockTime = new MockTime() |
| |
| @Before |
| def setUp() { |
| val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1) |
| config = KafkaConfig.fromProps(props) |
| } |
| |
| @After |
| def tearDown() { |
| brokerTopicStats.close() |
| Utils.delete(tmpDir) |
| } |
| |
| def createEmptyLogs(dir: File, offsets: Int*) { |
| for(offset <- offsets) { |
| Log.logFile(dir, offset).createNewFile() |
| Log.offsetIndexFile(dir, offset).createNewFile() |
| } |
| } |
| |
| @Test |
| def testLogDeleteDirName(): Unit = { |
| val name1 = Log.logDeleteDirName(new TopicPartition("foo", 3)) |
| assertTrue(name1.length <= 255) |
| assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches()) |
| assertTrue(Log.DeleteDirPattern.matcher(name1).matches()) |
| assertFalse(Log.FutureDirPattern.matcher(name1).matches()) |
| val name2 = Log.logDeleteDirName( |
| new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5)) |
| System.out.println("name2 = " + name2) |
| assertEquals(255, name2.length) |
| assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches()) |
| assertTrue(Log.DeleteDirPattern.matcher(name2).matches()) |
| assertFalse(Log.FutureDirPattern.matcher(name2).matches()) |
| } |
| |
| @Test |
| def testOffsetFromFile() { |
| val offset = 23423423L |
| |
| val logFile = Log.logFile(tmpDir, offset) |
| assertEquals(offset, Log.offsetFromFile(logFile)) |
| |
| val offsetIndexFile = Log.offsetIndexFile(tmpDir, offset) |
| assertEquals(offset, Log.offsetFromFile(offsetIndexFile)) |
| |
| val timeIndexFile = Log.timeIndexFile(tmpDir, offset) |
| assertEquals(offset, Log.offsetFromFile(timeIndexFile)) |
| |
| val snapshotFile = Log.producerSnapshotFile(tmpDir, offset) |
| assertEquals(offset, Log.offsetFromFile(snapshotFile)) |
| } |
| |
| /** |
| * Tests for time based log roll. This test appends messages then changes the time |
| * using the mock clock to force the log to roll and checks the number of segments. |
| */ |
| @Test |
| def testTimeBasedLogRoll() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes) |
| val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L) |
| |
| // create a log |
| val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = 24 * 60) |
| assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) |
| // Test the segment rolling behavior when messages do not have a timestamp. |
| mockTime.sleep(log.config.segmentMs + 1) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments) |
| |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments) |
| |
| for (numSegments <- 3 until 5) { |
| mockTime.sleep(log.config.segmentMs + 1) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) |
| } |
| |
| // Append a message with timestamp to a segment whose first message do not have a timestamp. |
| val timestamp = mockTime.milliseconds + log.config.segmentMs + 1 |
| def createRecordsWithTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = timestamp) |
| log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0) |
| assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments) |
| |
| // Test the segment rolling behavior when messages have timestamps. |
| mockTime.sleep(log.config.segmentMs + 1) |
| log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0) |
| assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments) |
| |
| // move the wall clock beyond log rolling time |
| mockTime.sleep(log.config.segmentMs + 1) |
| log.appendAsLeader(createRecordsWithTimestamp, leaderEpoch = 0) |
| assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments) |
| |
| val recordWithExpiredTimestamp = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| log.appendAsLeader(recordWithExpiredTimestamp, leaderEpoch = 0) |
| assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments) |
| |
| val numSegments = log.numberOfSegments |
| mockTime.sleep(log.config.segmentMs + 1) |
| log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE), leaderEpoch = 0) |
| assertEquals("Appending an empty message set should not roll log even if sufficient time has passed.", numSegments, log.numberOfSegments) |
| } |
| |
| @Test |
| def testRollSegmentThatAlreadyExists() { |
| val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L) |
| |
| // create a log |
| val log = createLog(logDir, logConfig) |
| assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) |
| |
| // roll active segment with the same base offset of size zero should recreate the segment |
| log.roll(Some(0L)) |
| assertEquals("Expect 1 segment after roll() empty segment with base offset.", 1, log.numberOfSegments) |
| |
| // should be able to append records to active segment |
| val records = TestUtils.records( |
| List(new SimpleRecord(mockTime.milliseconds, "k1".getBytes, "v1".getBytes)), |
| baseOffset = 0L, partitionLeaderEpoch = 0) |
| log.appendAsFollower(records) |
| assertEquals("Expect one segment.", 1, log.numberOfSegments) |
| assertEquals(0L, log.activeSegment.baseOffset) |
| |
| // make sure we can append more records |
| val records2 = TestUtils.records( |
| List(new SimpleRecord(mockTime.milliseconds + 10, "k2".getBytes, "v2".getBytes)), |
| baseOffset = 1L, partitionLeaderEpoch = 0) |
| log.appendAsFollower(records2) |
| |
| assertEquals("Expect two records in the log", 2, log.logEndOffset) |
| assertEquals(0, readLog(log, 0, 100, Some(1)).records.batches.iterator.next().lastOffset) |
| assertEquals(1, readLog(log, 1, 100, Some(2)).records.batches.iterator.next().lastOffset) |
| |
| // roll so that active segment is empty |
| log.roll() |
| assertEquals("Expect base offset of active segment to be LEO", 2L, log.activeSegment.baseOffset) |
| assertEquals("Expect two segments.", 2, log.numberOfSegments) |
| |
| // manually resize offset index to force roll of an empty active segment on next append |
| log.activeSegment.offsetIndex.resize(0) |
| val records3 = TestUtils.records( |
| List(new SimpleRecord(mockTime.milliseconds + 12, "k3".getBytes, "v3".getBytes)), |
| baseOffset = 2L, partitionLeaderEpoch = 0) |
| log.appendAsFollower(records3) |
| assertTrue(log.activeSegment.offsetIndex.maxEntries > 1) |
| assertEquals(2, readLog(log, 2, 100, Some(3)).records.batches.iterator.next().lastOffset) |
| assertEquals("Expect two segments.", 2, log.numberOfSegments) |
| } |
| |
| @Test(expected = classOf[OutOfOrderSequenceException]) |
| def testNonSequentialAppend(): Unit = { |
| // create a log |
| val log = createLog(logDir, LogConfig()) |
| val pid = 1L |
| val epoch: Short = 0 |
| |
| val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 0) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| |
| val nextRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = epoch, sequence = 2) |
| log.appendAsLeader(nextRecords, leaderEpoch = 0) |
| } |
| |
| @Test |
| def testTruncateToEmptySegment(): Unit = { |
| val log = createLog(logDir, LogConfig()) |
| |
| // Force a segment roll by using a large offset. The first segment will be empty |
| val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), |
| baseOffset = Int.MaxValue.toLong + 200) |
| appendAsFollower(log, records) |
| assertEquals(0, log.logSegments.head.size) |
| assertEquals(2, log.logSegments.size) |
| |
| // Truncate to an offset before the base offset of the latest segment |
| log.truncateTo(0L) |
| assertEquals(1, log.logSegments.size) |
| |
| // Now verify that we can still append to the active segment |
| appendAsFollower(log, TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), |
| baseOffset = 100L)) |
| assertEquals(1, log.logSegments.size) |
| assertEquals(101L, log.logEndOffset) |
| } |
| |
| @Test |
| def testInitializationOfProducerSnapshotsUpgradePath(): Unit = { |
| // simulate the upgrade path by creating a new log with several segments, deleting the |
| // snapshot files, and then reloading the log |
| val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10) |
| var log = createLog(logDir, logConfig) |
| assertEquals(None, log.oldestProducerSnapshotOffset) |
| |
| for (i <- 0 to 100) { |
| val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) |
| log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) |
| } |
| assertTrue(log.logSegments.size >= 2) |
| val logEndOffset = log.logEndOffset |
| log.close() |
| |
| val cleanShutdownFile = createCleanShutdownFile() |
| deleteProducerSnapshotFiles() |
| |
| // Reload after clean shutdown |
| log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) |
| var expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).takeRight(2).toVector :+ log.logEndOffset |
| assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) |
| log.close() |
| |
| Utils.delete(cleanShutdownFile) |
| deleteProducerSnapshotFiles() |
| |
| // Reload after unclean shutdown with recoveryPoint set to log end offset |
| log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) |
| assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) |
| log.close() |
| |
| deleteProducerSnapshotFiles() |
| |
| // Reload after unclean shutdown with recoveryPoint set to 0 |
| log = createLog(logDir, logConfig, recoveryPoint = 0L) |
| // We progressively create a snapshot for each segment after the recovery point |
| expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset |
| assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) |
| log.close() |
| } |
| |
| @Test |
| def testProducerSnapshotsRecoveryAfterUncleanShutdownV1(): Unit = { |
| testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.minSupportedFor(RecordVersion.V1).version) |
| } |
| |
| @Test |
| def testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat(): Unit = { |
| testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.latestVersion.version) |
| } |
| |
| @Test |
| def testLogReinitializeAfterManualDelete(): Unit = { |
| val logConfig = LogTest.createLogConfig() |
| // simulate a case where log data does not exist but the start offset is non-zero |
| val log = createLog(logDir, logConfig, logStartOffset = 500) |
| assertEquals(500, log.logStartOffset) |
| assertEquals(500, log.logEndOffset) |
| } |
| |
| @Test |
| def testLogEndLessThanStartAfterReopen(): Unit = { |
| val logConfig = LogTest.createLogConfig() |
| var log = createLog(logDir, logConfig) |
| for (i <- 0 until 5) { |
| val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) |
| log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) |
| log.roll() |
| } |
| assertEquals(6, log.logSegments.size) |
| |
| // Increment the log start offset |
| val startOffset = 4 |
| log.maybeIncrementLogStartOffset(startOffset) |
| assertTrue(log.logEndOffset > log.logStartOffset) |
| |
| // Append garbage to a segment below the current log start offset |
| val segmentToForceTruncation = log.logSegments.take(2).last |
| val bw = new BufferedWriter(new FileWriter(segmentToForceTruncation.log.file)) |
| bw.write("corruptRecord") |
| bw.close() |
| log.close() |
| |
| // Reopen the log. This will cause truncate the segment to which we appended garbage and delete all other segments. |
| // All remaining segments will be lower than the current log start offset, which will force deletion of all segments |
| // and recreation of a single, active segment starting at logStartOffset. |
| log = createLog(logDir, logConfig, logStartOffset = startOffset) |
| assertEquals(1, log.logSegments.size) |
| assertEquals(startOffset, log.logStartOffset) |
| assertEquals(startOffset, log.logEndOffset) |
| } |
| |
| private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion) |
| var log = createLog(logDir, logConfig) |
| assertEquals(None, log.oldestProducerSnapshotOffset) |
| |
| for (i <- 0 to 100) { |
| val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes) |
| log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0) |
| } |
| |
| assertTrue(log.logSegments.size >= 5) |
| val segmentOffsets = log.logSegments.toVector.map(_.baseOffset) |
| val activeSegmentOffset = segmentOffsets.last |
| |
| // We want the recovery point to be past the segment offset and before the last 2 segments including a gap of |
| // 1 segment. We collect the data before closing the log. |
| val offsetForSegmentAfterRecoveryPoint = segmentOffsets(segmentOffsets.size - 3) |
| val offsetForRecoveryPointSegment = segmentOffsets(segmentOffsets.size - 4) |
| val (segOffsetsBeforeRecovery, segOffsetsAfterRecovery) = segmentOffsets.toSet.partition(_ < offsetForRecoveryPointSegment) |
| val recoveryPoint = offsetForRecoveryPointSegment + 1 |
| assertTrue(recoveryPoint < offsetForSegmentAfterRecoveryPoint) |
| log.close() |
| |
| val segmentsWithReads = mutable.Set[LogSegment]() |
| val recoveredSegments = mutable.Set[LogSegment]() |
| val expectedSegmentsWithReads = mutable.Set[Long]() |
| val expectedSnapshotOffsets = mutable.Set[Long]() |
| |
| if (logConfig.messageFormatVersion < KAFKA_0_11_0_IV0) { |
| expectedSegmentsWithReads += activeSegmentOffset |
| expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset |
| } else { |
| expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Set(activeSegmentOffset) |
| expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset |
| } |
| |
| def createLogWithInterceptedReads(recoveryPoint: Long) = { |
| val maxProducerIdExpirationMs = 60 * 60 * 1000 |
| val topicPartition = Log.parseTopicPartitionName(logDir) |
| val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs) |
| |
| // Intercept all segment read calls |
| new Log(logDir, logConfig, logStartOffset = 0, recoveryPoint = recoveryPoint, mockTime.scheduler, |
| brokerTopicStats, mockTime, maxProducerIdExpirationMs, LogManager.ProducerIdExpirationCheckIntervalMs, |
| topicPartition, producerStateManager, new LogDirFailureChannel(10)) { |
| |
| override def addSegment(segment: LogSegment): LogSegment = { |
| val wrapper = new LogSegment(segment.log, segment.lazyOffsetIndex, segment.lazyTimeIndex, segment.txnIndex, segment.baseOffset, |
| segment.indexIntervalBytes, segment.rollJitterMs, mockTime) { |
| |
| override def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long, |
| minOneMessage: Boolean): FetchDataInfo = { |
| segmentsWithReads += this |
| super.read(startOffset, maxOffset, maxSize, maxPosition, minOneMessage) |
| } |
| |
| override def recover(producerStateManager: ProducerStateManager, |
| leaderEpochCache: Option[LeaderEpochFileCache]): Int = { |
| recoveredSegments += this |
| super.recover(producerStateManager, leaderEpochCache) |
| } |
| } |
| super.addSegment(wrapper) |
| } |
| } |
| } |
| |
| // Retain snapshots for the last 2 segments |
| ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2)) |
| log = createLogWithInterceptedReads(offsetForRecoveryPointSegment) |
| // We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour) |
| assertEquals(expectedSegmentsWithReads, segmentsWithReads.map(_.baseOffset)) |
| assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) |
| assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets.toSet) |
| log.close() |
| segmentsWithReads.clear() |
| recoveredSegments.clear() |
| |
| // Only delete snapshots before the base offset of the recovery point segment (post KAFKA-5829 behaviour) to |
| // avoid reading all segments |
| ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment) |
| log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint) |
| assertEquals(Set(activeSegmentOffset), segmentsWithReads.map(_.baseOffset)) |
| assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) |
| assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets.toSet) |
| |
| // Verify that we keep 2 snapshot files if we checkpoint the log end offset |
| log.deleteSnapshotsAfterRecoveryPointCheckpoint() |
| val expectedSnapshotsAfterDelete = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset |
| assertEquals(expectedSnapshotsAfterDelete, listProducerSnapshotOffsets) |
| log.close() |
| } |
| |
| @Test |
| def testSizeForLargeLogs(): Unit = { |
| val largeSize = Int.MaxValue.toLong * 2 |
| val logSegment: LogSegment = EasyMock.createMock(classOf[LogSegment]) |
| |
| EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes |
| EasyMock.replay(logSegment) |
| |
| assertEquals(Int.MaxValue, Log.sizeInBytes(Seq(logSegment))) |
| assertEquals(largeSize, Log.sizeInBytes(Seq(logSegment, logSegment))) |
| assertTrue(Log.sizeInBytes(Seq(logSegment, logSegment)) > Int.MaxValue) |
| } |
| |
| @Test |
| def testProducerIdMapOffsetUpdatedForNonIdempotentData() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes))) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| log.takeProducerSnapshot() |
| assertEquals(Some(1), log.latestProducerSnapshotOffset) |
| } |
| |
| @Test |
| def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = { |
| val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) |
| |
| // Load the log |
| EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None) |
| |
| stateManager.updateMapEndOffset(0L) |
| EasyMock.expectLastCall().anyTimes() |
| |
| EasyMock.expect(stateManager.mapEndOffset).andStubReturn(0L) |
| EasyMock.expect(stateManager.isEmpty).andStubReturn(true) |
| |
| stateManager.takeSnapshot() |
| EasyMock.expectLastCall().anyTimes() |
| |
| stateManager.truncateAndReload(EasyMock.eq(0L), EasyMock.eq(0L), EasyMock.anyLong) |
| EasyMock.expectLastCall() |
| |
| EasyMock.expect(stateManager.firstUnstableOffset).andStubReturn(None) |
| |
| EasyMock.replay(stateManager) |
| |
| val config = LogConfig(new Properties()) |
| val log = new Log(logDir, |
| config, |
| logStartOffset = 0L, |
| recoveryPoint = 0L, |
| scheduler = mockTime.scheduler, |
| brokerTopicStats = brokerTopicStats, |
| time = mockTime, |
| maxProducerIdExpirationMs = 300000, |
| producerIdExpirationCheckIntervalMs = 30000, |
| topicPartition = Log.parseTopicPartitionName(logDir), |
| producerStateManager = stateManager, |
| logDirFailureChannel = null) |
| |
| EasyMock.verify(stateManager) |
| |
| // Append some messages |
| EasyMock.reset(stateManager) |
| EasyMock.expect(stateManager.firstUnstableOffset).andStubReturn(None) |
| |
| stateManager.updateMapEndOffset(1L) |
| EasyMock.expectLastCall() |
| stateManager.updateMapEndOffset(2L) |
| EasyMock.expectLastCall() |
| |
| EasyMock.replay(stateManager) |
| |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0) |
| |
| EasyMock.verify(stateManager) |
| |
| // Now truncate |
| EasyMock.reset(stateManager) |
| EasyMock.expect(stateManager.firstUnstableOffset).andStubReturn(None) |
| EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None) |
| EasyMock.expect(stateManager.isEmpty).andStubReturn(true) |
| EasyMock.expect(stateManager.mapEndOffset).andReturn(2L) |
| stateManager.truncateAndReload(EasyMock.eq(0L), EasyMock.eq(1L), EasyMock.anyLong) |
| EasyMock.expectLastCall() |
| // Truncation causes the map end offset to reset to 0 |
| EasyMock.expect(stateManager.mapEndOffset).andReturn(0L) |
| // We skip directly to updating the map end offset |
| stateManager.updateMapEndOffset(1L) |
| EasyMock.expectLastCall() |
| // Finally, we take a snapshot |
| stateManager.takeSnapshot() |
| EasyMock.expectLastCall().once() |
| |
| EasyMock.replay(stateManager) |
| |
| log.truncateTo(1L) |
| |
| EasyMock.verify(stateManager) |
| } |
| |
| @Test |
| def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = { |
| val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) |
| |
| stateManager.updateMapEndOffset(0L) |
| EasyMock.expectLastCall().anyTimes() |
| |
| stateManager.takeSnapshot() |
| EasyMock.expectLastCall().anyTimes() |
| |
| EasyMock.expect(stateManager.isEmpty).andReturn(true) |
| EasyMock.expectLastCall().once() |
| |
| EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None) |
| EasyMock.expectLastCall().once() |
| |
| EasyMock.replay(stateManager) |
| |
| val logProps = new Properties() |
| logProps.put(LogConfig.MessageFormatVersionProp, "0.10.2") |
| val config = LogConfig(logProps) |
| new Log(logDir, |
| config, |
| logStartOffset = 0L, |
| recoveryPoint = 0L, |
| scheduler = mockTime.scheduler, |
| brokerTopicStats = brokerTopicStats, |
| time = mockTime, |
| maxProducerIdExpirationMs = 300000, |
| producerIdExpirationCheckIntervalMs = 30000, |
| topicPartition = Log.parseTopicPartitionName(logDir), |
| producerStateManager = stateManager, |
| logDirFailureChannel = null) |
| |
| EasyMock.verify(stateManager) |
| } |
| |
| @Test |
| def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = { |
| val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) |
| |
| stateManager.updateMapEndOffset(0L) |
| EasyMock.expectLastCall().anyTimes() |
| |
| stateManager.takeSnapshot() |
| EasyMock.expectLastCall().anyTimes() |
| |
| EasyMock.expect(stateManager.isEmpty).andReturn(true) |
| EasyMock.expectLastCall().once() |
| |
| EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None) |
| EasyMock.expectLastCall().once() |
| |
| EasyMock.replay(stateManager) |
| |
| val cleanShutdownFile = createCleanShutdownFile() |
| |
| val logProps = new Properties() |
| logProps.put(LogConfig.MessageFormatVersionProp, "0.10.2") |
| val config = LogConfig(logProps) |
| new Log(logDir, |
| config, |
| logStartOffset = 0L, |
| recoveryPoint = 0L, |
| scheduler = mockTime.scheduler, |
| brokerTopicStats = brokerTopicStats, |
| time = mockTime, |
| maxProducerIdExpirationMs = 300000, |
| producerIdExpirationCheckIntervalMs = 30000, |
| topicPartition = Log.parseTopicPartitionName(logDir), |
| producerStateManager = stateManager, |
| logDirFailureChannel = null) |
| |
| EasyMock.verify(stateManager) |
| Utils.delete(cleanShutdownFile) |
| } |
| |
| @Test |
| def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = { |
| val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager]) |
| |
| EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None) |
| |
| stateManager.updateMapEndOffset(0L) |
| EasyMock.expectLastCall().anyTimes() |
| |
| stateManager.takeSnapshot() |
| EasyMock.expectLastCall().anyTimes() |
| |
| EasyMock.expect(stateManager.isEmpty).andReturn(true) |
| EasyMock.expectLastCall().once() |
| |
| EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None) |
| EasyMock.expectLastCall().once() |
| |
| EasyMock.replay(stateManager) |
| |
| val cleanShutdownFile = createCleanShutdownFile() |
| |
| val logProps = new Properties() |
| logProps.put(LogConfig.MessageFormatVersionProp, "0.11.0") |
| val config = LogConfig(logProps) |
| new Log(logDir, |
| config, |
| logStartOffset = 0L, |
| recoveryPoint = 0L, |
| scheduler = mockTime.scheduler, |
| brokerTopicStats = brokerTopicStats, |
| time = mockTime, |
| maxProducerIdExpirationMs = 300000, |
| producerIdExpirationCheckIntervalMs = 30000, |
| topicPartition = Log.parseTopicPartitionName(logDir), |
| producerStateManager = stateManager, |
| logDirFailureChannel = null) |
| |
| EasyMock.verify(stateManager) |
| Utils.delete(cleanShutdownFile) |
| } |
| |
| @Test |
| def testRebuildProducerIdMapWithCompactedData() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| val pid = 1L |
| val epoch = 0.toShort |
| val seq = 0 |
| val baseOffset = 23L |
| |
| // create a batch with a couple gaps to simulate compaction |
| val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( |
| new SimpleRecord(System.currentTimeMillis(), "a".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "c".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "d".getBytes))) |
| records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) |
| |
| val filtered = ByteBuffer.allocate(2048) |
| records.filterTo(new TopicPartition("foo", 0), new RecordFilter { |
| override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY |
| override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey |
| }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) |
| filtered.flip() |
| val filteredRecords = MemoryRecords.readableRecords(filtered) |
| |
| log.appendAsFollower(filteredRecords) |
| |
| // append some more data and then truncate to force rebuilding of the PID map |
| val moreRecords = TestUtils.records(baseOffset = baseOffset + 4, records = List( |
| new SimpleRecord(System.currentTimeMillis(), "e".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "f".getBytes))) |
| moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) |
| log.appendAsFollower(moreRecords) |
| |
| log.truncateTo(baseOffset + 4) |
| |
| val activeProducers = log.activeProducersWithLastSequence |
| assertTrue(activeProducers.contains(pid)) |
| |
| val lastSeq = activeProducers(pid) |
| assertEquals(3, lastSeq) |
| } |
| |
| @Test |
| def testRebuildProducerStateWithEmptyCompactedBatch() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| val pid = 1L |
| val epoch = 0.toShort |
| val seq = 0 |
| val baseOffset = 23L |
| |
| // create an empty batch |
| val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( |
| new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "a".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes))) |
| records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) |
| |
| val filtered = ByteBuffer.allocate(2048) |
| records.filterTo(new TopicPartition("foo", 0), new RecordFilter { |
| override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.RETAIN_EMPTY |
| override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = false |
| }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) |
| filtered.flip() |
| val filteredRecords = MemoryRecords.readableRecords(filtered) |
| |
| log.appendAsFollower(filteredRecords) |
| |
| // append some more data and then truncate to force rebuilding of the PID map |
| val moreRecords = TestUtils.records(baseOffset = baseOffset + 2, records = List( |
| new SimpleRecord(System.currentTimeMillis(), "e".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "f".getBytes))) |
| moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) |
| log.appendAsFollower(moreRecords) |
| |
| log.truncateTo(baseOffset + 2) |
| |
| val activeProducers = log.activeProducersWithLastSequence |
| assertTrue(activeProducers.contains(pid)) |
| |
| val lastSeq = activeProducers(pid) |
| assertEquals(1, lastSeq) |
| } |
| |
| @Test |
| def testUpdateProducerIdMapWithCompactedData() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| val pid = 1L |
| val epoch = 0.toShort |
| val seq = 0 |
| val baseOffset = 23L |
| |
| // create a batch with a couple gaps to simulate compaction |
| val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( |
| new SimpleRecord(System.currentTimeMillis(), "a".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "c".getBytes), |
| new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "d".getBytes))) |
| records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) |
| |
| val filtered = ByteBuffer.allocate(2048) |
| records.filterTo(new TopicPartition("foo", 0), new RecordFilter { |
| override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY |
| override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey |
| }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) |
| filtered.flip() |
| val filteredRecords = MemoryRecords.readableRecords(filtered) |
| |
| log.appendAsFollower(filteredRecords) |
| val activeProducers = log.activeProducersWithLastSequence |
| assertTrue(activeProducers.contains(pid)) |
| |
| val lastSeq = activeProducers(pid) |
| assertEquals(3, lastSeq) |
| } |
| |
| @Test |
| def testProducerIdMapTruncateTo() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes))), leaderEpoch = 0) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes))), leaderEpoch = 0) |
| log.takeProducerSnapshot() |
| |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes))), leaderEpoch = 0) |
| log.takeProducerSnapshot() |
| |
| log.truncateTo(2) |
| assertEquals(Some(2), log.latestProducerSnapshotOffset) |
| assertEquals(2, log.latestProducerStateEndOffset) |
| |
| log.truncateTo(1) |
| assertEquals(Some(1), log.latestProducerSnapshotOffset) |
| assertEquals(1, log.latestProducerStateEndOffset) |
| |
| log.truncateTo(0) |
| assertEquals(None, log.latestProducerSnapshotOffset) |
| assertEquals(0, log.latestProducerStateEndOffset) |
| } |
| |
| @Test |
| def testProducerIdMapTruncateToWithNoSnapshots() { |
| // This ensures that the upgrade optimization path cannot be hit after initial loading |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| val pid = 1L |
| val epoch = 0.toShort |
| |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid, |
| producerEpoch = epoch, sequence = 0), leaderEpoch = 0) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid, |
| producerEpoch = epoch, sequence = 1), leaderEpoch = 0) |
| |
| deleteProducerSnapshotFiles() |
| |
| log.truncateTo(1L) |
| assertEquals(1, log.activeProducersWithLastSequence.size) |
| |
| val lastSeqOpt = log.activeProducersWithLastSequence.get(pid) |
| assertTrue(lastSeqOpt.isDefined) |
| |
| val lastSeq = lastSeqOpt.get |
| assertEquals(0, lastSeq) |
| } |
| |
| @Test |
| def testLoadProducersAfterDeleteRecordsMidSegment(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| val pid1 = 1L |
| val pid2 = 2L |
| val epoch = 0.toShort |
| |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1, |
| producerEpoch = epoch, sequence = 0), leaderEpoch = 0) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2, |
| producerEpoch = epoch, sequence = 0), leaderEpoch = 0) |
| assertEquals(2, log.activeProducersWithLastSequence.size) |
| |
| log.maybeIncrementLogStartOffset(1L) |
| |
| assertEquals(1, log.activeProducersWithLastSequence.size) |
| val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) |
| assertTrue(retainedLastSeqOpt.isDefined) |
| assertEquals(0, retainedLastSeqOpt.get) |
| |
| log.close() |
| |
| val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) |
| assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) |
| val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) |
| assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt) |
| } |
| |
| @Test |
| def testLoadProducersAfterDeleteRecordsOnSegment(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| val pid1 = 1L |
| val pid2 = 2L |
| val epoch = 0.toShort |
| |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "a".getBytes)), producerId = pid1, |
| producerEpoch = epoch, sequence = 0), leaderEpoch = 0) |
| log.roll() |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord(mockTime.milliseconds(), "b".getBytes)), producerId = pid2, |
| producerEpoch = epoch, sequence = 0), leaderEpoch = 0) |
| |
| assertEquals(2, log.logSegments.size) |
| assertEquals(2, log.activeProducersWithLastSequence.size) |
| |
| log.maybeIncrementLogStartOffset(1L) |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| |
| assertEquals(1, log.logSegments.size) |
| assertEquals(1, log.activeProducersWithLastSequence.size) |
| val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) |
| assertTrue(retainedLastSeqOpt.isDefined) |
| assertEquals(0, retainedLastSeqOpt.get) |
| |
| log.close() |
| |
| val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) |
| assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) |
| val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2) |
| assertEquals(retainedLastSeqOpt, reloadedEntryOpt) |
| } |
| |
| @Test |
| def testProducerIdMapTruncateFullyAndStartAt() { |
| val records = TestUtils.singletonRecords("foo".getBytes) |
| val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| log.takeProducerSnapshot() |
| |
| log.appendAsLeader(TestUtils.singletonRecords("bar".getBytes), leaderEpoch = 0) |
| log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), leaderEpoch = 0) |
| log.takeProducerSnapshot() |
| |
| assertEquals(3, log.logSegments.size) |
| assertEquals(3, log.latestProducerStateEndOffset) |
| assertEquals(Some(3), log.latestProducerSnapshotOffset) |
| |
| log.truncateFullyAndStartAt(29) |
| assertEquals(1, log.logSegments.size) |
| assertEquals(None, log.latestProducerSnapshotOffset) |
| assertEquals(29, log.latestProducerStateEndOffset) |
| } |
| |
| @Test |
| def testProducerIdExpirationOnSegmentDeletion() { |
| val pid1 = 1L |
| val records = TestUtils.records(Seq(new SimpleRecord("foo".getBytes)), producerId = pid1, producerEpoch = 0, sequence = 0) |
| val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes, retentionBytes = records.sizeInBytes * 2) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| log.takeProducerSnapshot() |
| |
| val pid2 = 2L |
| log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("bar".getBytes)), producerId = pid2, producerEpoch = 0, sequence = 0), |
| leaderEpoch = 0) |
| log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord("baz".getBytes)), producerId = pid2, producerEpoch = 0, sequence = 1), |
| leaderEpoch = 0) |
| log.takeProducerSnapshot() |
| |
| assertEquals(3, log.logSegments.size) |
| assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet) |
| |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| |
| assertEquals(2, log.logSegments.size) |
| assertEquals(Set(pid2), log.activeProducersWithLastSequence.keySet) |
| } |
| |
| @Test |
| def testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.singletonRecords("a".getBytes), leaderEpoch = 0) |
| log.roll(Some(1L)) |
| assertEquals(Some(1L), log.latestProducerSnapshotOffset) |
| assertEquals(Some(1L), log.oldestProducerSnapshotOffset) |
| |
| log.appendAsLeader(TestUtils.singletonRecords("b".getBytes), leaderEpoch = 0) |
| log.roll(Some(2L)) |
| assertEquals(Some(2L), log.latestProducerSnapshotOffset) |
| assertEquals(Some(1L), log.oldestProducerSnapshotOffset) |
| |
| log.appendAsLeader(TestUtils.singletonRecords("c".getBytes), leaderEpoch = 0) |
| log.roll(Some(3L)) |
| assertEquals(Some(3L), log.latestProducerSnapshotOffset) |
| |
| // roll triggers a flush at the starting offset of the new segment, we should retain all snapshots |
| assertEquals(Some(1L), log.oldestProducerSnapshotOffset) |
| |
| // retain the snapshots from the active segment and the previous segment, delete the oldest one |
| log.deleteSnapshotsAfterRecoveryPointCheckpoint() |
| assertEquals(Some(2L), log.oldestProducerSnapshotOffset) |
| |
| // even if we flush within the active segment, the snapshot should remain |
| log.appendAsLeader(TestUtils.singletonRecords("baz".getBytes), leaderEpoch = 0) |
| log.flush(4L) |
| assertEquals(Some(3L), log.latestProducerSnapshotOffset) |
| assertEquals(Some(2L), log.oldestProducerSnapshotOffset) |
| } |
| |
| @Test |
| def testProducerSnapshotAfterSegmentRollOnAppend(): Unit = { |
| val producerId = 1L |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024) |
| val log = createLog(logDir, logConfig) |
| |
| log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))), |
| producerId = producerId, producerEpoch = 0, sequence = 0), |
| leaderEpoch = 0) |
| |
| // The next append should overflow the segment and cause it to roll |
| log.appendAsLeader(TestUtils.records(Seq(new SimpleRecord(mockTime.milliseconds(), new Array[Byte](512))), |
| producerId = producerId, producerEpoch = 0, sequence = 1), |
| leaderEpoch = 0) |
| |
| assertEquals(2, log.logSegments.size) |
| assertEquals(1L, log.activeSegment.baseOffset) |
| assertEquals(Some(1L), log.latestProducerSnapshotOffset) |
| |
| // Force a reload from the snapshot to check its consistency |
| log.truncateTo(1L) |
| |
| assertEquals(2, log.logSegments.size) |
| assertEquals(1L, log.activeSegment.baseOffset) |
| assertTrue(log.activeSegment.log.batches.asScala.isEmpty) |
| assertEquals(Some(1L), log.latestProducerSnapshotOffset) |
| |
| val lastEntry = log.producerStateManager.lastEntry(producerId) |
| assertTrue(lastEntry.isDefined) |
| assertEquals(0L, lastEntry.get.firstOffset) |
| assertEquals(0L, lastEntry.get.lastDataOffset) |
| } |
| |
| @Test |
| def testRebuildTransactionalState(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| val pid = 137L |
| val epoch = 5.toShort |
| val seq = 0 |
| |
| // add some transactional records |
| val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid, epoch, seq, |
| new SimpleRecord("foo".getBytes), |
| new SimpleRecord("bar".getBytes), |
| new SimpleRecord("baz".getBytes)) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch), |
| isFromClient = false, leaderEpoch = 0) |
| log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1) |
| |
| // now there should be no first unstable offset |
| assertEquals(None, log.firstUnstableOffset) |
| |
| log.close() |
| |
| val reopenedLog = createLog(logDir, logConfig) |
| reopenedLog.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1) |
| assertEquals(None, reopenedLog.firstUnstableOffset) |
| } |
| |
| private def endTxnRecords(controlRecordType: ControlRecordType, |
| producerId: Long, |
| epoch: Short, |
| offset: Long = 0L, |
| coordinatorEpoch: Int = 0, |
| partitionLeaderEpoch: Int = 0): MemoryRecords = { |
| val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch) |
| MemoryRecords.withEndTransactionMarker(offset, mockTime.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker) |
| } |
| |
| @Test |
| def testPeriodicProducerIdExpiration() { |
| val maxProducerIdExpirationMs = 200 |
| val producerIdExpirationCheckIntervalMs = 100 |
| |
| val pid = 23L |
| val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5) |
| val log = createLog(logDir, logConfig, maxProducerIdExpirationMs = maxProducerIdExpirationMs, |
| producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs) |
| val records = Seq(new SimpleRecord(mockTime.milliseconds(), "foo".getBytes)) |
| log.appendAsLeader(TestUtils.records(records, producerId = pid, producerEpoch = 0, sequence = 0), leaderEpoch = 0) |
| |
| assertEquals(Set(pid), log.activeProducersWithLastSequence.keySet) |
| |
| mockTime.sleep(producerIdExpirationCheckIntervalMs) |
| assertEquals(Set(pid), log.activeProducersWithLastSequence.keySet) |
| |
| mockTime.sleep(producerIdExpirationCheckIntervalMs) |
| assertEquals(Set(), log.activeProducersWithLastSequence.keySet) |
| } |
| |
| @Test |
| def testDuplicateAppends(): Unit = { |
| // create a log |
| val log = createLog(logDir, LogConfig()) |
| val pid = 1L |
| val epoch: Short = 0 |
| |
| var seq = 0 |
| // Pad the beginning of the log. |
| for (_ <- 0 to 5) { |
| val record = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), |
| producerId = pid, producerEpoch = epoch, sequence = seq) |
| log.appendAsLeader(record, leaderEpoch = 0) |
| seq = seq + 1 |
| } |
| // Append an entry with multiple log records. |
| def createRecords = TestUtils.records(List( |
| new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), |
| new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), |
| new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes) |
| ), producerId = pid, producerEpoch = epoch, sequence = seq) |
| val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0) |
| assertEquals("should have appended 3 entries", multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset.get + 1, 3) |
| |
| // Append a Duplicate of the tail, when the entry at the tail has multiple records. |
| val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch = 0) |
| assertEquals("Somehow appended a duplicate entry with multiple log records to the tail", |
| multiEntryAppendInfo.firstOffset.get, dupMultiEntryAppendInfo.firstOffset.get) |
| assertEquals("Somehow appended a duplicate entry with multiple log records to the tail", |
| multiEntryAppendInfo.lastOffset, dupMultiEntryAppendInfo.lastOffset) |
| |
| seq = seq + 3 |
| |
| // Append a partial duplicate of the tail. This is not allowed. |
| try { |
| val records = TestUtils.records( |
| List( |
| new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes), |
| new SimpleRecord(mockTime.milliseconds, s"key-$seq".getBytes, s"value-$seq".getBytes)), |
| producerId = pid, producerEpoch = epoch, sequence = seq - 2) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| fail("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a records " + |
| "in the middle of the log.") |
| } catch { |
| case _: OutOfOrderSequenceException => // Good! |
| } |
| |
| // Append a duplicate of the batch which is 4th from the tail. This should succeed without error since we |
| // retain the batch metadata of the last 5 batches. |
| val duplicateOfFourth = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), |
| producerId = pid, producerEpoch = epoch, sequence = 2) |
| log.appendAsLeader(duplicateOfFourth, leaderEpoch = 0) |
| |
| // Duplicates at older entries are reported as OutOfOrderSequence errors |
| try { |
| val records = TestUtils.records( |
| List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes, s"value-1".getBytes)), |
| producerId = pid, producerEpoch = epoch, sequence = 1) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| fail("Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a batch " + |
| "which is older than the last 5 appended batches.") |
| } catch { |
| case _: OutOfOrderSequenceException => // Good! |
| } |
| |
| // Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry. |
| def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), |
| producerId = pid, producerEpoch = epoch, sequence = seq) |
| val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) |
| val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate, leaderEpoch = 0) |
| assertEquals("Inserted a duplicate records into the log", origAppendInfo.firstOffset.get, newAppendInfo.firstOffset.get) |
| assertEquals("Inserted a duplicate records into the log", origAppendInfo.lastOffset, newAppendInfo.lastOffset) |
| } |
| |
| @Test |
| def testMultipleProducerIdsPerMemoryRecord() : Unit = { |
| // create a log |
| val log = createLog(logDir, LogConfig()) |
| |
| val epoch: Short = 0 |
| val buffer = ByteBuffer.allocate(512) |
| |
| var builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 1L, epoch, 0, false, 0) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 2L, epoch, 0, false, 0) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 3L, epoch, 0, false, 0) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 4L, epoch, 0, false, 0) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| buffer.flip() |
| val memoryRecords = MemoryRecords.readableRecords(buffer) |
| |
| log.appendAsFollower(memoryRecords) |
| log.flush() |
| |
| val fetchedData = readLog(log, 0, Int.MaxValue) |
| |
| val origIterator = memoryRecords.batches.iterator() |
| for (batch <- fetchedData.records.batches.asScala) { |
| assertTrue(origIterator.hasNext) |
| val origEntry = origIterator.next() |
| assertEquals(origEntry.producerId, batch.producerId) |
| assertEquals(origEntry.baseOffset, batch.baseOffset) |
| assertEquals(origEntry.baseSequence, batch.baseSequence) |
| } |
| } |
| |
| @Test |
| def testDuplicateAppendToFollower() : Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| val epoch: Short = 0 |
| val pid = 1L |
| val baseSequence = 0 |
| val partitionLeaderEpoch = 0 |
| // The point of this test is to ensure that validation isn't performed on the follower. |
| // this is a bit contrived. to trigger the duplicate case for a follower append, we have to append |
| // a batch with matching sequence numbers, but valid increasing offsets |
| assertEquals(0L, log.logEndOffset) |
| log.appendAsFollower(MemoryRecords.withIdempotentRecords(0L, CompressionType.NONE, pid, epoch, baseSequence, |
| partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) |
| log.appendAsFollower(MemoryRecords.withIdempotentRecords(2L, CompressionType.NONE, pid, epoch, baseSequence, |
| partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes))) |
| |
| // Ensure that even the duplicate sequences are accepted on the follower. |
| assertEquals(4L, log.logEndOffset) |
| } |
| |
| @Test |
| def testMultipleProducersWithDuplicatesInSingleAppend() : Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| val pid1 = 1L |
| val pid2 = 2L |
| val epoch: Short = 0 |
| |
| val buffer = ByteBuffer.allocate(512) |
| |
| // pid1 seq = 0 |
| var builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), pid1, epoch, 0) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| // pid2 seq = 0 |
| builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), pid2, epoch, 0) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| // pid1 seq = 1 |
| builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), pid1, epoch, 1) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| // pid2 seq = 1 |
| builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), pid2, epoch, 1) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| // // pid1 seq = 1 (duplicate) |
| builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, 4L, mockTime.milliseconds(), pid1, epoch, 1) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| |
| buffer.flip() |
| |
| val records = MemoryRecords.readableRecords(buffer) |
| records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) |
| |
| // Ensure that batches with duplicates are accepted on the follower. |
| assertEquals(0L, log.logEndOffset) |
| log.appendAsFollower(records) |
| assertEquals(5L, log.logEndOffset) |
| } |
| |
| @Test(expected = classOf[ProducerFencedException]) |
| def testOldProducerEpoch(): Unit = { |
| // create a log |
| val log = createLog(logDir, LogConfig()) |
| val pid = 1L |
| val newEpoch: Short = 1 |
| val oldEpoch: Short = 0 |
| |
| val records = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = newEpoch, sequence = 0) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| |
| val nextRecords = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)), producerId = pid, producerEpoch = oldEpoch, sequence = 0) |
| log.appendAsLeader(nextRecords, leaderEpoch = 0) |
| } |
| |
| /** |
| * Test for jitter s for time based log roll. This test appends messages then changes the time |
| * using the mock clock to force the log to roll and checks the number of segments. |
| */ |
| @Test |
| def testTimeBasedLogRollJitter() { |
| var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| val maxJitter = 20 * 60L |
| // create a log |
| val logConfig = LogTest.createLogConfig(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter) |
| val log = createLog(logDir, logConfig) |
| assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) |
| log.appendAsLeader(set, leaderEpoch = 0) |
| |
| mockTime.sleep(log.config.segmentMs - maxJitter) |
| set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| log.appendAsLeader(set, leaderEpoch = 0) |
| assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments) |
| mockTime.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) |
| set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| log.appendAsLeader(set, leaderEpoch = 0) |
| assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments) |
| } |
| |
| /** |
| * Test that appending more than the maximum segment size rolls the log |
| */ |
| @Test |
| def testSizeBasedLogRoll() { |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| val setSize = createRecords.sizeInBytes |
| val msgPerSeg = 10 |
| val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages |
| // create a log |
| val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize) |
| val log = createLog(logDir, logConfig) |
| assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) |
| |
| // segments expire in size |
| for (_ <- 1 to (msgPerSeg + 1)) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) |
| } |
| |
| /** |
| * Test that we can open and append to an empty log |
| */ |
| @Test |
| def testLoadEmptyLog() { |
| createEmptyLogs(logDir, 0) |
| val log = createLog(logDir, LogConfig()) |
| log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0) |
| } |
| |
| /** |
| * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets. |
| */ |
| @Test |
| def testAppendAndReadWithSequentialOffsets() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 71) |
| val log = createLog(logDir, logConfig) |
| val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray |
| |
| for(value <- values) |
| log.appendAsLeader(TestUtils.singletonRecords(value = value), leaderEpoch = 0) |
| |
| for(i <- values.indices) { |
| val read = readLog(log, i, 100, Some(i+1)).records.batches.iterator.next() |
| assertEquals("Offset read should match order appended.", i, read.lastOffset) |
| val actual = read.iterator.next() |
| assertNull("Key should be null", actual.key) |
| assertEquals("Values not equal", ByteBuffer.wrap(values(i)), actual.value) |
| } |
| assertEquals("Reading beyond the last message returns nothing.", 0, |
| readLog(log, values.length, 100).records.batches.asScala.size) |
| } |
| |
| /** |
| * This test appends a bunch of messages with non-sequential offsets and checks that we can an the correct message |
| * from any offset less than the logEndOffset including offsets not appended. |
| */ |
| @Test |
| def testAppendAndReadWithNonSequentialOffsets() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 72) |
| val log = createLog(logDir, logConfig) |
| val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray |
| val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) |
| |
| // now test the case that we give the offsets and use non-sequential offsets |
| for(i <- records.indices) |
| log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i))) |
| for(i <- 50 until messageIds.max) { |
| val idx = messageIds.indexWhere(_ >= i) |
| val read = readLog(log, i, 100).records.records.iterator.next() |
| assertEquals("Offset read should match message id.", messageIds(idx), read.offset) |
| assertEquals("Message should match appended.", records(idx), new SimpleRecord(read)) |
| } |
| } |
| |
| /** |
| * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment. |
| * Specifically we create a log where the last message in the first segment has offset 0. If we |
| * then read offset 1, we should expect this read to come from the second segment, even though the |
| * first segment has the greatest lower bound on the offset. |
| */ |
| @Test |
| def testReadAtLogGap() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 300) |
| val log = createLog(logDir, logConfig) |
| |
| // keep appending until we have two segments with only a single message in the second segment |
| while(log.numberOfSegments == 1) |
| log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0) |
| |
| // now manually truncate off all but one message from the first segment to create a gap in the messages |
| log.logSegments.head.truncateTo(1) |
| |
| assertEquals("A read should now return the last message in the log", log.logEndOffset - 1, |
| readLog(log, 1, 200).records.batches.iterator.next().lastOffset) |
| } |
| |
| @Test(expected = classOf[KafkaStorageException]) |
| def testLogRollAfterLogHandlerClosed() { |
| val logConfig = LogTest.createLogConfig() |
| val log = createLog(logDir, logConfig) |
| log.closeHandlers() |
| log.roll(Some(1L)) |
| } |
| |
| @Test |
| def testReadWithMinMessage() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 72) |
| val log = createLog(logDir, logConfig) |
| val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray |
| val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) |
| |
| // now test the case that we give the offsets and use non-sequential offsets |
| for (i <- records.indices) |
| log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i))) |
| |
| for (i <- 50 until messageIds.max) { |
| val idx = messageIds.indexWhere(_ >= i) |
| val reads = Seq( |
| readLog(log, i, 1), |
| readLog(log, i, 100), |
| readLog(log, i, 100, Some(10000)) |
| ).map(_.records.records.iterator.next()) |
| reads.foreach { read => |
| assertEquals("Offset read should match message id.", messageIds(idx), read.offset) |
| assertEquals("Message should match appended.", records(idx), new SimpleRecord(read)) |
| } |
| |
| val fetchedData = readLog(log, i, 1, Some(1)) |
| assertEquals(Seq.empty, fetchedData.records.batches.asScala.toIndexedSeq) |
| } |
| } |
| |
| @Test |
| def testReadWithTooSmallMaxLength() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 72) |
| val log = createLog(logDir, logConfig) |
| val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray |
| val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) |
| |
| // now test the case that we give the offsets and use non-sequential offsets |
| for (i <- records.indices) |
| log.appendAsFollower(MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, 0, records(i))) |
| |
| for (i <- 50 until messageIds.max) { |
| assertEquals(MemoryRecords.EMPTY, readLog(log, i, maxLength = 0, minOneMessage = false).records) |
| |
| // we return an incomplete message instead of an empty one for the case below |
| // we use this mechanism to tell consumers of the fetch request version 2 and below that the message size is |
| // larger than the fetch size |
| // in fetch request version 3, we no longer need this as we return oversized messages from the first non-empty |
| // partition |
| val fetchInfo = readLog(log, i, maxLength = 1, minOneMessage = false) |
| assertTrue(fetchInfo.firstEntryIncomplete) |
| assertTrue(fetchInfo.records.isInstanceOf[FileRecords]) |
| assertEquals(1, fetchInfo.records.sizeInBytes) |
| } |
| } |
| |
| /** |
| * Test reading at the boundary of the log, specifically |
| * - reading from the logEndOffset should give an empty message set |
| * - reading from the maxOffset should give an empty message set |
| * - reading beyond the log end offset should throw an OffsetOutOfRangeException |
| */ |
| @Test |
| def testReadOutOfRange() { |
| createEmptyLogs(logDir, 1024) |
| // set up replica log starting with offset 1024 and with one message (at offset 1024) |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0) |
| |
| assertEquals("Reading at the log end offset should produce 0 byte read.", 0, |
| readLog(log, 1025, 1000).records.sizeInBytes) |
| |
| try { |
| readLog(log, 0, 1000) |
| fail("Reading below the log start offset should throw OffsetOutOfRangeException") |
| } catch { |
| case _: OffsetOutOfRangeException => // This is good. |
| } |
| |
| try { |
| readLog(log, 1026, 1000) |
| fail("Reading at beyond the log end offset should throw OffsetOutOfRangeException") |
| } catch { |
| case _: OffsetOutOfRangeException => // This is good. |
| } |
| |
| assertEquals("Reading from below the specified maxOffset should produce 0 byte read.", 0, |
| readLog(log, 1025, 1000, Some(1024)).records.sizeInBytes) |
| } |
| |
| /** |
| * Test that covers reads and writes on a multisegment log. This test appends a bunch of messages |
| * and then reads them all back and checks that the message read and offset matches what was appended. |
| */ |
| @Test |
| def testLogRolls() { |
| /* create a multipart log with 100 messages */ |
| val logConfig = LogTest.createLogConfig(segmentBytes = 100) |
| val log = createLog(logDir, logConfig) |
| val numMessages = 100 |
| val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes, |
| timestamp = mockTime.milliseconds)) |
| messageSets.foreach(log.appendAsLeader(_, leaderEpoch = 0)) |
| log.flush() |
| |
| /* do successive reads to ensure all our messages are there */ |
| var offset = 0L |
| for(i <- 0 until numMessages) { |
| val messages = readLog(log, offset, 1024*1024).records.batches |
| val head = messages.iterator.next() |
| assertEquals("Offsets not equal", offset, head.lastOffset) |
| |
| val expected = messageSets(i).records.iterator.next() |
| val actual = head.iterator.next() |
| assertEquals(s"Keys not equal at offset $offset", expected.key, actual.key) |
| assertEquals(s"Values not equal at offset $offset", expected.value, actual.value) |
| assertEquals(s"Timestamps not equal at offset $offset", expected.timestamp, actual.timestamp) |
| offset = head.lastOffset + 1 |
| } |
| val lastRead = readLog(log, startOffset = numMessages, maxLength = 1024*1024, |
| maxOffset = Some(numMessages + 1)).records |
| assertEquals("Should be no more messages", 0, lastRead.records.asScala.size) |
| |
| // check that rolling the log forced a flushed, the flush is async so retry in case of failure |
| TestUtils.retry(1000L){ |
| assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) |
| } |
| } |
| |
| /** |
| * Test reads at offsets that fall within compressed message set boundaries. |
| */ |
| @Test |
| def testCompressedMessages() { |
| /* this log should roll after every messageset */ |
| val logConfig = LogTest.createLogConfig(segmentBytes = 110) |
| val log = createLog(logDir, logConfig) |
| |
| /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ |
| log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0) |
| log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("alpha".getBytes), new SimpleRecord("beta".getBytes)), leaderEpoch = 0) |
| |
| def read(offset: Int) = readLog(log, offset, 4096).records.records |
| |
| /* we should always get the first message in the compressed set when reading any offset in the set */ |
| assertEquals("Read at offset 0 should produce 0", 0, read(0).iterator.next().offset) |
| assertEquals("Read at offset 1 should produce 0", 0, read(1).iterator.next().offset) |
| assertEquals("Read at offset 2 should produce 2", 2, read(2).iterator.next().offset) |
| assertEquals("Read at offset 3 should produce 2", 2, read(3).iterator.next().offset) |
| } |
| |
| /** |
| * Test garbage collecting old segments |
| */ |
| @Test |
| def testThatGarbageCollectingSegmentsDoesntChangeOffset() { |
| for(messagesToAppend <- List(0, 1, 25)) { |
| logDir.mkdirs() |
| // first test a log segment starting at 0 |
| val logConfig = LogTest.createLogConfig(segmentBytes = 100, retentionMs = 0) |
| val log = createLog(logDir, logConfig) |
| for(i <- 0 until messagesToAppend) |
| log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = mockTime.milliseconds - 10), leaderEpoch = 0) |
| |
| val currOffset = log.logEndOffset |
| assertEquals(currOffset, messagesToAppend) |
| |
| // time goes by; the log file is deleted |
| log.onHighWatermarkIncremented(currOffset) |
| log.deleteOldSegments() |
| |
| assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset) |
| assertEquals("We should still have one segment left", 1, log.numberOfSegments) |
| assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments()) |
| assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) |
| assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", |
| currOffset, |
| log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, timestamp = mockTime.milliseconds), leaderEpoch = 0).firstOffset.get) |
| |
| // cleanup the log |
| log.delete() |
| } |
| } |
| |
| /** |
| * MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by |
| * appending a message set larger than the config.segmentSize setting and checking that an exception is thrown. |
| */ |
| @Test |
| def testMessageSetSizeCheck() { |
| val messageSet = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes)) |
| // append messages to log |
| val configSegmentSize = messageSet.sizeInBytes - 1 |
| val logConfig = LogTest.createLogConfig(segmentBytes = configSegmentSize) |
| val log = createLog(logDir, logConfig) |
| |
| try { |
| log.appendAsLeader(messageSet, leaderEpoch = 0) |
| fail("message set should throw RecordBatchTooLargeException.") |
| } catch { |
| case _: RecordBatchTooLargeException => // this is good |
| } |
| } |
| |
| @Test |
| def testCompactedTopicConstraints() { |
| val keyedMessage = new SimpleRecord("and here it is".getBytes, "this message has a key".getBytes) |
| val anotherKeyedMessage = new SimpleRecord("another key".getBytes, "this message also has a key".getBytes) |
| val unkeyedMessage = new SimpleRecord("this message does not have a key".getBytes) |
| |
| val messageSetWithUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage, keyedMessage) |
| val messageSetWithOneUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage) |
| val messageSetWithCompressedKeyedMessage = MemoryRecords.withRecords(CompressionType.GZIP, keyedMessage) |
| val messageSetWithCompressedUnkeyedMessage = MemoryRecords.withRecords(CompressionType.GZIP, keyedMessage, unkeyedMessage) |
| |
| val messageSetWithKeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage) |
| val messageSetWithKeyedMessages = MemoryRecords.withRecords(CompressionType.NONE, keyedMessage, anotherKeyedMessage) |
| |
| val logConfig = LogTest.createLogConfig(cleanupPolicy = LogConfig.Compact) |
| val log = createLog(logDir, logConfig) |
| |
| try { |
| log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0) |
| fail("Compacted topics cannot accept a message without a key.") |
| } catch { |
| case _: CorruptRecordException => // this is good |
| } |
| try { |
| log.appendAsLeader(messageSetWithOneUnkeyedMessage, leaderEpoch = 0) |
| fail("Compacted topics cannot accept a message without a key.") |
| } catch { |
| case _: CorruptRecordException => // this is good |
| } |
| try { |
| log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, leaderEpoch = 0) |
| fail("Compacted topics cannot accept a message without a key.") |
| } catch { |
| case _: CorruptRecordException => // this is good |
| } |
| |
| // the following should succeed without any InvalidMessageException |
| log.appendAsLeader(messageSetWithKeyedMessage, leaderEpoch = 0) |
| log.appendAsLeader(messageSetWithKeyedMessages, leaderEpoch = 0) |
| log.appendAsLeader(messageSetWithCompressedKeyedMessage, leaderEpoch = 0) |
| } |
| |
| /** |
| * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the |
| * setting and checking that an exception is thrown. |
| */ |
| @Test |
| def testMessageSizeCheck() { |
| val first = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes)) |
| val second = MemoryRecords.withRecords(CompressionType.NONE, |
| new SimpleRecord("change (I need more bytes)... blah blah blah.".getBytes), |
| new SimpleRecord("More padding boo hoo".getBytes)) |
| |
| // append messages to log |
| val maxMessageSize = second.sizeInBytes - 1 |
| val logConfig = LogTest.createLogConfig(maxMessageBytes = maxMessageSize) |
| val log = createLog(logDir, logConfig) |
| |
| // should be able to append the small message |
| log.appendAsLeader(first, leaderEpoch = 0) |
| |
| try { |
| log.appendAsLeader(second, leaderEpoch = 0) |
| fail("Second message set should throw MessageSizeTooLargeException.") |
| } catch { |
| case _: RecordTooLargeException => // this is good |
| } |
| } |
| /** |
| * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. |
| */ |
| @Test |
| def testLogRecoversToCorrectOffset() { |
| val numMessages = 100 |
| val messageSize = 100 |
| val segmentSize = 7 * messageSize |
| val indexInterval = 3 * messageSize |
| val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = indexInterval, segmentIndexBytes = 4096) |
| var log = createLog(logDir, logConfig) |
| for(i <- 0 until numMessages) |
| log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize), |
| timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) |
| assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) |
| val lastIndexOffset = log.activeSegment.offsetIndex.lastOffset |
| val numIndexEntries = log.activeSegment.offsetIndex.entries |
| val lastOffset = log.logEndOffset |
| // After segment is closed, the last entry in the time index should be (largest timestamp -> last offset). |
| val lastTimeIndexOffset = log.logEndOffset - 1 |
| val lastTimeIndexTimestamp = log.activeSegment.largestTimestamp |
| // Depending on when the last time index entry is inserted, an entry may or may not be inserted into the time index. |
| val numTimeIndexEntries = log.activeSegment.timeIndex.entries + { |
| if (log.activeSegment.timeIndex.lastEntry.offset == log.logEndOffset - 1) 0 else 1 |
| } |
| log.close() |
| |
| def verifyRecoveredLog(log: Log, expectedRecoveryPoint: Long) { |
| assertEquals(s"Unexpected recovery point", expectedRecoveryPoint, log.recoveryPoint) |
| assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset) |
| assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.offsetIndex.lastOffset) |
| assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.offsetIndex.entries) |
| assertEquals("Should have same last time index timestamp", lastTimeIndexTimestamp, log.activeSegment.timeIndex.lastEntry.timestamp) |
| assertEquals("Should have same last time index offset", lastTimeIndexOffset, log.activeSegment.timeIndex.lastEntry.offset) |
| assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries) |
| } |
| |
| log = createLog(logDir, logConfig, recoveryPoint = lastOffset) |
| verifyRecoveredLog(log, lastOffset) |
| log.close() |
| |
| // test recovery case |
| log = createLog(logDir, logConfig) |
| verifyRecoveredLog(log, lastOffset) |
| log.close() |
| } |
| |
| /** |
| * Test building the time index on the follower by setting assignOffsets to false. |
| */ |
| @Test |
| def testBuildTimeIndexWhenNotAssigningOffsets() { |
| val numMessages = 100 |
| val logConfig = LogTest.createLogConfig(segmentBytes = 10000, indexIntervalBytes = 1) |
| val log = createLog(logDir, logConfig) |
| |
| val messages = (0 until numMessages).map { i => |
| MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(mockTime.milliseconds + i, i.toString.getBytes())) |
| } |
| messages.foreach(log.appendAsFollower) |
| val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries } |
| assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries) |
| assertEquals(s"The last time index entry should have timestamp ${mockTime.milliseconds + numMessages - 1}", |
| mockTime.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp) |
| } |
| |
| /** |
| * Test that if we manually delete an index segment it is rebuilt when the log is re-opened |
| */ |
| @Test |
| def testIndexRebuild() { |
| // publish the messages and close the log |
| val numMessages = 200 |
| val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) |
| var log = createLog(logDir, logConfig) |
| for(i <- 0 until numMessages) |
| log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) |
| val indexFiles = log.logSegments.map(_.lazyOffsetIndex.file) |
| val timeIndexFiles = log.logSegments.map(_.lazyTimeIndex.file) |
| log.close() |
| |
| // delete all the index files |
| indexFiles.foreach(_.delete()) |
| timeIndexFiles.foreach(_.delete()) |
| |
| // reopen the log |
| log = createLog(logDir, logConfig) |
| assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) |
| assertTrue("The index should have been rebuilt", log.logSegments.head.offsetIndex.entries > 0) |
| assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0) |
| for(i <- 0 until numMessages) { |
| assertEquals(i, readLog(log, i, 100).records.batches.iterator.next().lastOffset) |
| if (i == 0) |
| assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) |
| else |
| assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) |
| } |
| log.close() |
| } |
| |
| @Test |
| def testFetchOffsetByTimestampIncludesLeaderEpoch(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) |
| val log = createLog(logDir, logConfig) |
| |
| assertEquals(None, log.fetchOffsetByTimestamp(0L)) |
| |
| val firstTimestamp = mockTime.milliseconds |
| val firstLeaderEpoch = 0 |
| log.appendAsLeader(TestUtils.singletonRecords( |
| value = TestUtils.randomBytes(10), |
| timestamp = firstTimestamp), |
| leaderEpoch = firstLeaderEpoch) |
| |
| val secondTimestamp = firstTimestamp + 1 |
| val secondLeaderEpoch = 1 |
| log.appendAsLeader(TestUtils.singletonRecords( |
| value = TestUtils.randomBytes(10), |
| timestamp = secondTimestamp), |
| leaderEpoch = secondLeaderEpoch) |
| |
| assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), |
| log.fetchOffsetByTimestamp(firstTimestamp)) |
| assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), |
| log.fetchOffsetByTimestamp(secondTimestamp)) |
| |
| assertEquals(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), |
| log.fetchOffsetByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)) |
| assertEquals(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), |
| log.fetchOffsetByTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)) |
| |
| // The cache can be updated directly after a leader change. |
| // The new latest offset should reflect the updated epoch. |
| log.maybeAssignEpochStartOffset(2, 2L) |
| |
| assertEquals(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), |
| log.fetchOffsetByTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)) |
| } |
| |
| /** |
| * Test that if messages format version of the messages in a segment is before 0.10.0, the time index should be empty. |
| */ |
| @Test |
| def testRebuildTimeIndexForOldMessages() { |
| val numMessages = 200 |
| val segmentSize = 200 |
| val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = 1, messageFormatVersion = "0.9.0") |
| var log = createLog(logDir, logConfig) |
| for (i <- 0 until numMessages) |
| log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), |
| timestamp = mockTime.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0) |
| val timeIndexFiles = log.logSegments.map(_.lazyTimeIndex.file) |
| log.close() |
| |
| // Delete the time index. |
| timeIndexFiles.foreach(file => Files.delete(file.toPath)) |
| |
| // The rebuilt time index should be empty |
| log = createLog(logDir, logConfig, recoveryPoint = numMessages + 1) |
| for (segment <- log.logSegments.init) { |
| assertEquals("The time index should be empty", 0, segment.timeIndex.entries) |
| assertEquals("The time index file size should be 0", 0, segment.lazyTimeIndex.file.length) |
| } |
| } |
| |
| /** |
| * Test that if we have corrupted an index segment it is rebuilt when the log is re-opened |
| */ |
| @Test |
| def testCorruptIndexRebuild() { |
| // publish the messages and close the log |
| val numMessages = 200 |
| val logConfig = LogTest.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) |
| var log = createLog(logDir, logConfig) |
| for(i <- 0 until numMessages) |
| log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = mockTime.milliseconds + i * 10), leaderEpoch = 0) |
| val indexFiles = log.logSegments.map(_.lazyOffsetIndex.file) |
| val timeIndexFiles = log.logSegments.map(_.lazyTimeIndex.file) |
| log.close() |
| |
| // corrupt all the index files |
| for( file <- indexFiles) { |
| val bw = new BufferedWriter(new FileWriter(file)) |
| bw.write(" ") |
| bw.close() |
| } |
| |
| // corrupt all the index files |
| for( file <- timeIndexFiles) { |
| val bw = new BufferedWriter(new FileWriter(file)) |
| bw.write(" ") |
| bw.close() |
| } |
| |
| // reopen the log with recovery point=0 so that the segment recovery can be triggered |
| log = createLog(logDir, logConfig) |
| assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) |
| for(i <- 0 until numMessages) { |
| assertEquals(i, readLog(log, i, 100).records.batches.iterator.next().lastOffset) |
| if (i == 0) |
| assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) |
| else |
| assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset) |
| } |
| log.close() |
| } |
| |
| /** |
| * Test the Log truncate operations |
| */ |
| @Test |
| def testTruncateTo() { |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| val setSize = createRecords.sizeInBytes |
| val msgPerSeg = 10 |
| val segmentSize = msgPerSeg * setSize // each segment will be 10 messages |
| |
| // create a log |
| val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize) |
| val log = createLog(logDir, logConfig) |
| assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) |
| |
| for (_ <- 1 to msgPerSeg) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments) |
| assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset) |
| |
| val lastOffset = log.logEndOffset |
| val size = log.size |
| log.truncateTo(log.logEndOffset) // keep the entire log |
| assertEquals("Should not change offset", lastOffset, log.logEndOffset) |
| assertEquals("Should not change log size", size, log.size) |
| log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset |
| assertEquals("Should not change offset but should log error", lastOffset, log.logEndOffset) |
| assertEquals("Should not change log size", size, log.size) |
| log.truncateTo(msgPerSeg/2) // truncate somewhere in between |
| assertEquals("Should change offset", log.logEndOffset, msgPerSeg/2) |
| assertTrue("Should change log size", log.size < size) |
| log.truncateTo(0) // truncate the entire log |
| assertEquals("Should change offset", 0, log.logEndOffset) |
| assertEquals("Should change log size", 0, log.size) |
| |
| for (_ <- 1 to msgPerSeg) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) |
| assertEquals("Should be back to original size", log.size, size) |
| log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1)) |
| assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1)) |
| assertEquals("Should change log size", log.size, 0) |
| |
| for (_ <- 1 to msgPerSeg) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| assertTrue("Should be ahead of to original offset", log.logEndOffset > msgPerSeg) |
| assertEquals("log size should be same as before", size, log.size) |
| log.truncateTo(0) // truncate before first start offset in the log |
| assertEquals("Should change offset", 0, log.logEndOffset) |
| assertEquals("Should change log size", log.size, 0) |
| } |
| |
| /** |
| * Verify that when we truncate a log the index of the last segment is resized to the max index size to allow more appends |
| */ |
| @Test |
| def testIndexResizingAtTruncation() { |
| val setSize = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds).sizeInBytes |
| val msgPerSeg = 10 |
| val segmentSize = msgPerSeg * setSize // each segment will be 10 messages |
| val logConfig = LogTest.createLogConfig(segmentBytes = segmentSize, indexIntervalBytes = setSize - 1) |
| val log = createLog(logDir, logConfig) |
| assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) |
| |
| for (i<- 1 to msgPerSeg) |
| log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) |
| assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) |
| |
| mockTime.sleep(msgPerSeg) |
| for (i<- 1 to msgPerSeg) |
| log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) |
| assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments) |
| val expectedEntries = msgPerSeg - 1 |
| |
| assertEquals(s"The index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.offsetIndex.maxEntries) |
| assertEquals(s"The time index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.timeIndex.maxEntries) |
| |
| log.truncateTo(0) |
| assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) |
| assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.offsetIndex.maxEntries) |
| assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries) |
| |
| mockTime.sleep(msgPerSeg) |
| for (i<- 1 to msgPerSeg) |
| log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds + i), leaderEpoch = 0) |
| assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) |
| } |
| |
| /** |
| * When we open a log any index segments without an associated log segment should be deleted. |
| */ |
| @Test |
| def testBogusIndexSegmentsAreRemoved() { |
| val bogusIndex1 = Log.offsetIndexFile(logDir, 0) |
| val bogusTimeIndex1 = Log.timeIndexFile(logDir, 0) |
| val bogusIndex2 = Log.offsetIndexFile(logDir, 5) |
| val bogusTimeIndex2 = Log.timeIndexFile(logDir, 5) |
| |
| // The files remain absent until we first access it because we are doing lazy loading for time index and offset index |
| // files but in this test case we need to create these files in order to test we will remove them. |
| bogusIndex2.createNewFile() |
| bogusTimeIndex2.createNewFile() |
| |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1) |
| val log = createLog(logDir, logConfig) |
| |
| // Force the segment to access the index files because we are doing index lazy loading. |
| log.logSegments.toSeq.head.offsetIndex |
| log.logSegments.toSeq.head.timeIndex |
| |
| assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) |
| assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0) |
| assertFalse("The second index file should have been deleted.", bogusIndex2.exists) |
| assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists) |
| |
| // check that we can append to the log |
| for (_ <- 0 until 10) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.delete() |
| } |
| |
| /** |
| * Verify that truncation works correctly after re-opening the log |
| */ |
| @Test |
| def testReopenThenTruncate() { |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| // create a log |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000) |
| var log = createLog(logDir, logConfig) |
| |
| // add enough messages to roll over several segments then close and re-open and attempt to truncate |
| for (_ <- 0 until 100) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| log.close() |
| log = createLog(logDir, logConfig) |
| log.truncateTo(3) |
| assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) |
| assertEquals("Log end offset should be 3.", 3, log.logEndOffset) |
| } |
| |
| /** |
| * Test that deleted files are deleted after the appropriate time. |
| */ |
| @Test |
| def testAsyncDelete() { |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000L) |
| val asyncDeleteMs = 1000 |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000, |
| retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs) |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 100) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| // files should be renamed |
| val segments = log.logSegments.toArray |
| val oldFiles = segments.map(_.log.file) ++ segments.map(_.lazyOffsetIndex.file) |
| |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| |
| assertEquals("Only one segment should remain.", 1, log.numberOfSegments) |
| assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && |
| segments.forall(_.lazyOffsetIndex.file.getName.endsWith(Log.DeletedFileSuffix))) |
| assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) && |
| segments.forall(_.lazyOffsetIndex.file.exists)) |
| assertTrue("The original file should be gone.", oldFiles.forall(!_.exists)) |
| |
| // when enough time passes the files should be deleted |
| val deletedFiles = segments.map(_.log.file) ++ segments.map(_.lazyOffsetIndex.file) |
| mockTime.sleep(asyncDeleteMs + 1) |
| assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists)) |
| } |
| |
| /** |
| * Any files ending in .deleted should be removed when the log is re-opened. |
| */ |
| @Test |
| def testOpenDeletesObsoleteFiles() { |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) |
| var log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 100) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| // expire all segments |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| log.close() |
| log = createLog(logDir, logConfig) |
| assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) |
| } |
| |
| @Test |
| def testAppendMessageWithNullPayload() { |
| val log = createLog(logDir, LogConfig()) |
| log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0) |
| val head = readLog(log, 0, 4096).records.records.iterator.next() |
| assertEquals(0, head.offset) |
| assertTrue("Message payload should be null.", !head.hasValue) |
| } |
| |
| @Test |
| def testAppendWithOutOfOrderOffsetsThrowsException() { |
| val log = createLog(logDir, LogConfig()) |
| |
| val appendOffsets = Seq(0L, 1L, 3L, 2L, 4L) |
| val buffer = ByteBuffer.allocate(512) |
| for (offset <- appendOffsets) { |
| val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE, |
| TimestampType.LOG_APPEND_TIME, offset, mockTime.milliseconds(), |
| 1L, 0, 0, false, 0) |
| builder.append(new SimpleRecord("key".getBytes, "value".getBytes)) |
| builder.close() |
| } |
| buffer.flip() |
| val memoryRecords = MemoryRecords.readableRecords(buffer) |
| |
| assertThrows[OffsetsOutOfOrderException] { |
| log.appendAsFollower(memoryRecords) |
| } |
| } |
| |
| @Test |
| def testAppendBelowExpectedOffsetThrowsException() { |
| val log = createLog(logDir, LogConfig()) |
| val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray |
| records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0)) |
| |
| val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) |
| val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) |
| for (magic <- magicVals; compression <- compressionTypes) { |
| val invalidRecord = MemoryRecords.withRecords(magic, compression, new SimpleRecord(1.toString.getBytes)) |
| withClue(s"Magic=$magic, compressionType=$compression") { |
| assertThrows[UnexpectedAppendOffsetException] { |
| log.appendAsFollower(invalidRecord) |
| } |
| } |
| } |
| } |
| |
| @Test |
| def testAppendEmptyLogBelowLogStartOffsetThrowsException() { |
| createEmptyLogs(logDir, 7) |
| val log = createLog(logDir, LogConfig(), brokerTopicStats = brokerTopicStats) |
| assertEquals(7L, log.logStartOffset) |
| assertEquals(7L, log.logEndOffset) |
| |
| val firstOffset = 4L |
| val magicVals = Seq(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) |
| val compressionTypes = Seq(CompressionType.NONE, CompressionType.LZ4) |
| for (magic <- magicVals; compression <- compressionTypes) { |
| val batch = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes), |
| new SimpleRecord("k2".getBytes, "v2".getBytes), |
| new SimpleRecord("k3".getBytes, "v3".getBytes)), |
| magicValue = magic, codec = compression, |
| baseOffset = firstOffset) |
| |
| withClue(s"Magic=$magic, compressionType=$compression") { |
| val exception = intercept[UnexpectedAppendOffsetException] { |
| log.appendAsFollower(records = batch) |
| } |
| assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#firstOffset", |
| firstOffset, exception.firstOffset) |
| assertEquals(s"Magic=$magic, compressionType=$compression, UnexpectedAppendOffsetException#lastOffset", |
| firstOffset + 2, exception.lastOffset) |
| } |
| } |
| } |
| |
| @Test |
| def testAppendWithNoTimestamp(): Unit = { |
| val log = createLog(logDir, LogConfig()) |
| log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, |
| new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0) |
| } |
| |
| @Test |
| def testCorruptLog() { |
| // append some messages to create some segments |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| val recoveryPoint = 50L |
| for (_ <- 0 until 10) { |
| // create a log and write some messages to it |
| logDir.mkdirs() |
| var log = createLog(logDir, logConfig) |
| val numMessages = 50 + TestUtils.random.nextInt(50) |
| for (_ <- 0 until numMessages) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| val records = log.logSegments.flatMap(_.log.records.asScala.toList).toList |
| log.close() |
| |
| // corrupt index and log by appending random bytes |
| TestUtils.appendNonsenseToFile(log.activeSegment.lazyOffsetIndex.file, TestUtils.random.nextInt(1024) + 1) |
| TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) |
| |
| // attempt recovery |
| log = createLog(logDir, logConfig, brokerTopicStats, 0L, recoveryPoint) |
| assertEquals(numMessages, log.logEndOffset) |
| |
| val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList |
| assertEquals(records.size, recovered.size) |
| |
| for (i <- records.indices) { |
| val expected = records(i) |
| val actual = recovered(i) |
| assertEquals(s"Keys not equal", expected.key, actual.key) |
| assertEquals(s"Values not equal", expected.value, actual.value) |
| assertEquals(s"Timestamps not equal", expected.timestamp, actual.timestamp) |
| } |
| |
| Utils.delete(logDir) |
| } |
| } |
| |
| @Test |
| def testOverCompactedLogRecovery(): Unit = { |
| // append some messages to create some segments |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| val log = createLog(logDir, logConfig) |
| val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes())) |
| val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes())) |
| val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 3, CompressionType.NONE, 0, new SimpleRecord("v4".getBytes(), "k4".getBytes())) |
| val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, CompressionType.NONE, 0, new SimpleRecord("v5".getBytes(), "k5".getBytes())) |
| //Writes into an empty log with baseOffset 0 |
| log.appendAsFollower(set1) |
| assertEquals(0L, log.activeSegment.baseOffset) |
| //This write will roll the segment, yielding a new segment with base offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2 |
| log.appendAsFollower(set2) |
| assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset) |
| assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists) |
| //This will go into the existing log |
| log.appendAsFollower(set3) |
| assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset) |
| //This will go into the existing log |
| log.appendAsFollower(set4) |
| assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset) |
| log.close() |
| val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index")) |
| assertEquals(2, indexFiles.length) |
| for (file <- indexFiles) { |
| val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong) |
| assertTrue(offsetIndex.lastOffset >= 0) |
| offsetIndex.close() |
| } |
| Utils.delete(logDir) |
| } |
| |
| @Test |
| def testWriteLeaderEpochCheckpointAfterDirectoryRename(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) |
| assertEquals(Some(5), log.latestEpoch) |
| |
| // Ensure that after a directory rename, the epoch cache is written to the right location |
| val tp = Log.parseTopicPartitionName(log.dir) |
| log.renameDir(Log.logDeleteDirName(tp)) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 10) |
| assertEquals(Some(10), log.latestEpoch) |
| assertTrue(LeaderEpochCheckpointFile.newFile(log.dir).exists()) |
| assertFalse(LeaderEpochCheckpointFile.newFile(this.logDir).exists()) |
| } |
| |
| @Test |
| def testLeaderEpochCacheClearedAfterDowngradeInAppendedMessages(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) |
| assertEquals(Some(5), log.leaderEpochCache.flatMap(_.latestEpoch)) |
| |
| log.appendAsFollower(TestUtils.records(List(new SimpleRecord("foo".getBytes())), |
| baseOffset = 1L, |
| magicValue = RecordVersion.V1.value)) |
| assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch)) |
| } |
| |
| @Test |
| def testLeaderEpochCacheClearedAfterStaticMessageFormatDowngrade(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) |
| assertEquals(Some(5), log.latestEpoch) |
| log.close() |
| |
| // reopen the log with an older message format version and check the cache |
| val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, |
| maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_10_2_IV0.shortVersion) |
| val reopened = createLog(logDir, downgradedLogConfig) |
| assertLeaderEpochCacheEmpty(reopened) |
| |
| reopened.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())), |
| magicValue = RecordVersion.V1.value), leaderEpoch = 5) |
| assertLeaderEpochCacheEmpty(reopened) |
| } |
| |
| @Test |
| def testLeaderEpochCacheClearedAfterDynamicMessageFormatDowngrade(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) |
| assertEquals(Some(5), log.latestEpoch) |
| |
| val downgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, |
| maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_10_2_IV0.shortVersion) |
| log.updateConfig(Set(LogConfig.MessageFormatVersionProp), downgradedLogConfig) |
| assertLeaderEpochCacheEmpty(log) |
| |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())), |
| magicValue = RecordVersion.V1.value), leaderEpoch = 5) |
| assertLeaderEpochCacheEmpty(log) |
| } |
| |
| @Test |
| def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, |
| maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_10_2_IV0.shortVersion) |
| val log = createLog(logDir, logConfig) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("bar".getBytes())), |
| magicValue = RecordVersion.V1.value), leaderEpoch = 5) |
| assertLeaderEpochCacheEmpty(log) |
| |
| val upgradedLogConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, |
| maxMessageBytes = 64 * 1024, messageFormatVersion = kafka.api.KAFKA_0_11_0_IV0.shortVersion) |
| log.updateConfig(Set(LogConfig.MessageFormatVersionProp), upgradedLogConfig) |
| log.appendAsLeader(TestUtils.records(List(new SimpleRecord("foo".getBytes()))), leaderEpoch = 5) |
| assertEquals(Some(5), log.latestEpoch) |
| } |
| |
| private def assertLeaderEpochCacheEmpty(log: Log): Unit = { |
| assertEquals(None, log.leaderEpochCache) |
| assertEquals(None, log.latestEpoch) |
| assertFalse(LeaderEpochCheckpointFile.newFile(log.dir).exists()) |
| } |
| |
| @Test |
| def testOverCompactedLogRecoveryMultiRecord(): Unit = { |
| // append some messages to create some segments |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| val log = createLog(logDir, logConfig) |
| val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes())) |
| val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP, 0, |
| new SimpleRecord("v3".getBytes(), "k3".getBytes()), |
| new SimpleRecord("v4".getBytes(), "k4".getBytes())) |
| val set3 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 4, CompressionType.GZIP, 0, |
| new SimpleRecord("v5".getBytes(), "k5".getBytes()), |
| new SimpleRecord("v6".getBytes(), "k6".getBytes())) |
| val set4 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 6, CompressionType.GZIP, 0, |
| new SimpleRecord("v7".getBytes(), "k7".getBytes()), |
| new SimpleRecord("v8".getBytes(), "k8".getBytes())) |
| //Writes into an empty log with baseOffset 0 |
| log.appendAsFollower(set1) |
| assertEquals(0L, log.activeSegment.baseOffset) |
| //This write will roll the segment, yielding a new segment with base offset = max(1, Integer.MAX_VALUE+2) = Integer.MAX_VALUE+2 |
| log.appendAsFollower(set2) |
| assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset) |
| assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 2).exists) |
| //This will go into the existing log |
| log.appendAsFollower(set3) |
| assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset) |
| //This will go into the existing log |
| log.appendAsFollower(set4) |
| assertEquals(Integer.MAX_VALUE.toLong + 2, log.activeSegment.baseOffset) |
| log.close() |
| val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index")) |
| assertEquals(2, indexFiles.length) |
| for (file <- indexFiles) { |
| val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong) |
| assertTrue(offsetIndex.lastOffset >= 0) |
| offsetIndex.close() |
| } |
| Utils.delete(logDir) |
| } |
| |
| @Test |
| def testOverCompactedLogRecoveryMultiRecordV1(): Unit = { |
| // append some messages to create some segments |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| val log = createLog(logDir, logConfig) |
| val set1 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0, CompressionType.NONE, |
| new SimpleRecord("v1".getBytes(), "k1".getBytes())) |
| val set2 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 2, CompressionType.GZIP, |
| new SimpleRecord("v3".getBytes(), "k3".getBytes()), |
| new SimpleRecord("v4".getBytes(), "k4".getBytes())) |
| val set3 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 4, CompressionType.GZIP, |
| new SimpleRecord("v5".getBytes(), "k5".getBytes()), |
| new SimpleRecord("v6".getBytes(), "k6".getBytes())) |
| val set4 = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, Integer.MAX_VALUE.toLong + 6, CompressionType.GZIP, |
| new SimpleRecord("v7".getBytes(), "k7".getBytes()), |
| new SimpleRecord("v8".getBytes(), "k8".getBytes())) |
| //Writes into an empty log with baseOffset 0 |
| log.appendAsFollower(set1) |
| assertEquals(0L, log.activeSegment.baseOffset) |
| //This write will roll the segment, yielding a new segment with base offset = max(1, 3) = 3 |
| log.appendAsFollower(set2) |
| assertEquals(3, log.activeSegment.baseOffset) |
| assertTrue(Log.producerSnapshotFile(logDir, 3).exists) |
| //This will also roll the segment, yielding a new segment with base offset = max(5, Integer.MAX_VALUE+4) = Integer.MAX_VALUE+4 |
| log.appendAsFollower(set3) |
| assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset) |
| assertTrue(Log.producerSnapshotFile(logDir, Integer.MAX_VALUE.toLong + 4).exists) |
| //This will go into the existing log |
| log.appendAsFollower(set4) |
| assertEquals(Integer.MAX_VALUE.toLong + 4, log.activeSegment.baseOffset) |
| log.close() |
| val indexFiles = logDir.listFiles.filter(file => file.getName.contains(".index")) |
| assertEquals(3, indexFiles.length) |
| for (file <- indexFiles) { |
| val offsetIndex = new OffsetIndex(file, file.getName.replace(".index","").toLong) |
| assertTrue(offsetIndex.lastOffset >= 0) |
| offsetIndex.close() |
| } |
| Utils.delete(logDir) |
| } |
| |
| @Test |
| def testSplitOnOffsetOverflow(): Unit = { |
| // create a log such that one log segment has offsets that overflow, and call the split API on that segment |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig) |
| assertTrue("At least one segment must have offset overflow", LogTest.hasOffsetOverflow(log)) |
| |
| val allRecordsBeforeSplit = LogTest.allRecords(log) |
| |
| // split the segment with overflow |
| log.splitOverflowedSegment(segmentWithOverflow) |
| |
| // assert we were successfully able to split the segment |
| assertEquals(4, log.numberOfSegments) |
| LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit) |
| |
| // verify we do not have offset overflow anymore |
| assertFalse(LogTest.hasOffsetOverflow(log)) |
| } |
| |
| @Test |
| def testDegenerateSegmentSplit(): Unit = { |
| // This tests a scenario where all of the batches appended to a segment have overflowed. |
| // When we split the overflowed segment, only one new segment will be created. |
| |
| val overflowOffset = Int.MaxValue + 1L |
| val batch1 = MemoryRecords.withRecords(overflowOffset, CompressionType.NONE, 0, |
| new SimpleRecord("a".getBytes)) |
| val batch2 = MemoryRecords.withRecords(overflowOffset + 1, CompressionType.NONE, 0, |
| new SimpleRecord("b".getBytes)) |
| |
| testDegenerateSplitSegmentWithOverflow(segmentBaseOffset = 0L, List(batch1, batch2)) |
| } |
| |
| @Test |
| def testDegenerateSegmentSplitWithOutOfRangeBatchLastOffset(): Unit = { |
| // Degenerate case where the only batch in the segment overflows. In this scenario, |
| // the first offset of the batch is valid, but the last overflows. |
| |
| val firstBatchBaseOffset = Int.MaxValue - 1 |
| val records = MemoryRecords.withRecords(firstBatchBaseOffset, CompressionType.NONE, 0, |
| new SimpleRecord("a".getBytes), |
| new SimpleRecord("b".getBytes), |
| new SimpleRecord("c".getBytes)) |
| |
| testDegenerateSplitSegmentWithOverflow(segmentBaseOffset = 0L, List(records)) |
| } |
| |
| private def testDegenerateSplitSegmentWithOverflow(segmentBaseOffset: Long, records: List[MemoryRecords]): Unit = { |
| val segment = LogTest.rawSegment(logDir, segmentBaseOffset) |
| // Need to create the offset files explicitly to avoid triggering segment recovery to truncate segment. |
| Log.offsetIndexFile(logDir, segmentBaseOffset).createNewFile() |
| Log.timeIndexFile(logDir, segmentBaseOffset).createNewFile() |
| records.foreach(segment.append _) |
| segment.close() |
| |
| // Create clean shutdown file so that we do not split during the load |
| createCleanShutdownFile() |
| |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue) |
| |
| val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse { |
| Assertions.fail("Failed to create log with a segment which has overflowed offsets") |
| } |
| |
| val allRecordsBeforeSplit = LogTest.allRecords(log) |
| log.splitOverflowedSegment(segmentWithOverflow) |
| |
| assertEquals(1, log.numberOfSegments) |
| |
| val firstBatchBaseOffset = records.head.batches.asScala.head.baseOffset |
| assertEquals(firstBatchBaseOffset, log.activeSegment.baseOffset) |
| LogTest.verifyRecordsInLog(log, allRecordsBeforeSplit) |
| |
| assertFalse(LogTest.hasOffsetOverflow(log)) |
| } |
| |
| @Test |
| def testRecoveryOfSegmentWithOffsetOverflow(): Unit = { |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val (log, _) = createLogWithOffsetOverflow(logConfig) |
| val expectedKeys = LogTest.keysInLog(log) |
| |
| // Run recovery on the log. This should split the segment underneath. Ignore .deleted files as we could have still |
| // have them lying around after the split. |
| val recoveredLog = recoverAndCheck(logConfig, expectedKeys) |
| assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) |
| |
| // Running split again would throw an error |
| for (segment <- recoveredLog.logSegments) { |
| try { |
| log.splitOverflowedSegment(segment) |
| fail() |
| } catch { |
| case _: IllegalArgumentException => |
| } |
| } |
| } |
| |
| @Test |
| def testRecoveryAfterCrashDuringSplitPhase1(): Unit = { |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig) |
| val expectedKeys = LogTest.keysInLog(log) |
| val numSegmentsInitial = log.logSegments.size |
| |
| // Split the segment |
| val newSegments = log.splitOverflowedSegment(segmentWithOverflow) |
| |
| // Simulate recovery just after .cleaned file is created, before rename to .swap. On recovery, existing split |
| // operation is aborted but the recovery process itself kicks off split which should complete. |
| newSegments.reverse.foreach(segment => { |
| segment.changeFileSuffixes("", Log.CleanedFileSuffix) |
| segment.truncateTo(0) |
| }) |
| for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) |
| Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) |
| |
| val recoveredLog = recoverAndCheck(logConfig, expectedKeys) |
| assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) |
| assertEquals(numSegmentsInitial + 1, recoveredLog.logSegments.size) |
| recoveredLog.close() |
| } |
| |
| @Test |
| def testRecoveryAfterCrashDuringSplitPhase2(): Unit = { |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig) |
| val expectedKeys = LogTest.keysInLog(log) |
| val numSegmentsInitial = log.logSegments.size |
| |
| // Split the segment |
| val newSegments = log.splitOverflowedSegment(segmentWithOverflow) |
| |
| // Simulate recovery just after one of the new segments has been renamed to .swap. On recovery, existing split |
| // operation is aborted but the recovery process itself kicks off split which should complete. |
| newSegments.reverse.foreach { segment => |
| if (segment != newSegments.last) |
| segment.changeFileSuffixes("", Log.CleanedFileSuffix) |
| else |
| segment.changeFileSuffixes("", Log.SwapFileSuffix) |
| segment.truncateTo(0) |
| } |
| for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) |
| Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) |
| |
| val recoveredLog = recoverAndCheck(logConfig, expectedKeys) |
| assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) |
| assertEquals(numSegmentsInitial + 1, recoveredLog.logSegments.size) |
| recoveredLog.close() |
| } |
| |
| @Test |
| def testRecoveryAfterCrashDuringSplitPhase3(): Unit = { |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig) |
| val expectedKeys = LogTest.keysInLog(log) |
| val numSegmentsInitial = log.logSegments.size |
| |
| // Split the segment |
| val newSegments = log.splitOverflowedSegment(segmentWithOverflow) |
| |
| // Simulate recovery right after all new segments have been renamed to .swap. On recovery, existing split operation |
| // is completed and the old segment must be deleted. |
| newSegments.reverse.foreach(segment => { |
| segment.changeFileSuffixes("", Log.SwapFileSuffix) |
| }) |
| for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) |
| Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) |
| |
| // Truncate the old segment |
| segmentWithOverflow.truncateTo(0) |
| |
| val recoveredLog = recoverAndCheck(logConfig, expectedKeys) |
| assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) |
| assertEquals(numSegmentsInitial + 1, recoveredLog.logSegments.size) |
| log.close() |
| } |
| |
| @Test |
| def testRecoveryAfterCrashDuringSplitPhase4(): Unit = { |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig) |
| val expectedKeys = LogTest.keysInLog(log) |
| val numSegmentsInitial = log.logSegments.size |
| |
| // Split the segment |
| val newSegments = log.splitOverflowedSegment(segmentWithOverflow) |
| |
| // Simulate recovery right after all new segments have been renamed to .swap and old segment has been deleted. On |
| // recovery, existing split operation is completed. |
| newSegments.reverse.foreach(_.changeFileSuffixes("", Log.SwapFileSuffix)) |
| |
| for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) |
| Utils.delete(file) |
| |
| // Truncate the old segment |
| segmentWithOverflow.truncateTo(0) |
| |
| val recoveredLog = recoverAndCheck(logConfig, expectedKeys) |
| assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) |
| assertEquals(numSegmentsInitial + 1, recoveredLog.logSegments.size) |
| recoveredLog.close() |
| } |
| |
| @Test |
| def testRecoveryAfterCrashDuringSplitPhase5(): Unit = { |
| val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) |
| val (log, segmentWithOverflow) = createLogWithOffsetOverflow(logConfig) |
| val expectedKeys = LogTest.keysInLog(log) |
| val numSegmentsInitial = log.logSegments.size |
| |
| // Split the segment |
| val newSegments = log.splitOverflowedSegment(segmentWithOverflow) |
| |
| // Simulate recovery right after one of the new segment has been renamed to .swap and the other to .log. On |
| // recovery, existing split operation is completed. |
| newSegments.last.changeFileSuffixes("", Log.SwapFileSuffix) |
| |
| // Truncate the old segment |
| segmentWithOverflow.truncateTo(0) |
| |
| val recoveredLog = recoverAndCheck(logConfig, expectedKeys) |
| assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) |
| assertEquals(numSegmentsInitial + 1, recoveredLog.logSegments.size) |
| recoveredLog.close() |
| } |
| |
| @Test |
| def testCleanShutdownFile() { |
| // append some messages to create some segments |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1000, indexIntervalBytes = 1, maxMessageBytes = 64 * 1024) |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) |
| |
| val cleanShutdownFile = createCleanShutdownFile() |
| assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) |
| var recoveryPoint = 0L |
| // create a log and write some messages to it |
| var log = createLog(logDir, logConfig) |
| for (_ <- 0 until 100) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| log.close() |
| |
| // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the |
| // clean shutdown file exists. |
| recoveryPoint = log.logEndOffset |
| log = createLog(logDir, logConfig) |
| assertEquals(recoveryPoint, log.logEndOffset) |
| Utils.delete(cleanShutdownFile) |
| } |
| |
| @Test |
| def testParseTopicPartitionName() { |
| val topic = "test_topic" |
| val partition = "143" |
| val dir = new File(logDir, topicPartitionName(topic, partition)) |
| val topicPartition = Log.parseTopicPartitionName(dir) |
| assertEquals(topic, topicPartition.topic) |
| assertEquals(partition.toInt, topicPartition.partition) |
| } |
| |
| /** |
| * Tests that log directories with a period in their name that have been marked for deletion |
| * are parsed correctly by `Log.parseTopicPartitionName` (see KAFKA-5232 for details). |
| */ |
| @Test |
| def testParseTopicPartitionNameWithPeriodForDeletedTopic() { |
| val topic = "foo.bar-testtopic" |
| val partition = "42" |
| val dir = new File(logDir, Log.logDeleteDirName(new TopicPartition(topic, partition.toInt))) |
| val topicPartition = Log.parseTopicPartitionName(dir) |
| assertEquals("Unexpected topic name parsed", topic, topicPartition.topic) |
| assertEquals("Unexpected partition number parsed", partition.toInt, topicPartition.partition) |
| } |
| |
| @Test |
| def testParseTopicPartitionNameForEmptyName() { |
| try { |
| val dir = new File("") |
| Log.parseTopicPartitionName(dir) |
| fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // its GOOD! |
| } |
| } |
| |
| @Test |
| def testParseTopicPartitionNameForNull() { |
| try { |
| val dir: File = null |
| Log.parseTopicPartitionName(dir) |
| fail("KafkaException should have been thrown for dir: " + dir) |
| } catch { |
| case _: KafkaException => // its GOOD! |
| } |
| } |
| |
| @Test |
| def testParseTopicPartitionNameForMissingSeparator() { |
| val topic = "test_topic" |
| val partition = "1999" |
| val dir = new File(logDir, topic + partition) |
| try { |
| Log.parseTopicPartitionName(dir) |
| fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| // also test the "-delete" marker case |
| val deleteMarkerDir = new File(logDir, topic + partition + "." + DeleteDirSuffix) |
| try { |
| Log.parseTopicPartitionName(deleteMarkerDir) |
| fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| } |
| |
| @Test |
| def testParseTopicPartitionNameForMissingTopic() { |
| val topic = "" |
| val partition = "1999" |
| val dir = new File(logDir, topicPartitionName(topic, partition)) |
| try { |
| Log.parseTopicPartitionName(dir) |
| fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| // also test the "-delete" marker case |
| val deleteMarkerDir = new File(logDir, Log.logDeleteDirName(new TopicPartition(topic, partition.toInt))) |
| try { |
| Log.parseTopicPartitionName(deleteMarkerDir) |
| fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| |
| } |
| |
| @Test |
| def testParseTopicPartitionNameForMissingPartition() { |
| val topic = "test_topic" |
| val partition = "" |
| val dir = new File(logDir + topicPartitionName(topic, partition)) |
| try { |
| Log.parseTopicPartitionName(dir) |
| fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| // also test the "-delete" marker case |
| val deleteMarkerDir = new File(logDir, topicPartitionName(topic, partition) + "." + DeleteDirSuffix) |
| try { |
| Log.parseTopicPartitionName(deleteMarkerDir) |
| fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| } |
| |
| @Test |
| def testParseTopicPartitionNameForInvalidPartition() { |
| val topic = "test_topic" |
| val partition = "1999a" |
| val dir = new File(logDir, topicPartitionName(topic, partition)) |
| try { |
| Log.parseTopicPartitionName(dir) |
| fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| // also test the "-delete" marker case |
| val deleteMarkerDir = new File(logDir, topic + partition + "." + DeleteDirSuffix) |
| try { |
| Log.parseTopicPartitionName(deleteMarkerDir) |
| fail("KafkaException should have been thrown for dir: " + deleteMarkerDir.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // expected |
| } |
| } |
| |
| @Test |
| def testParseTopicPartitionNameForExistingInvalidDir() { |
| val dir1 = new File(logDir + "/non_kafka_dir") |
| try { |
| Log.parseTopicPartitionName(dir1) |
| fail("KafkaException should have been thrown for dir: " + dir1.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // should only throw KafkaException |
| } |
| val dir2 = new File(logDir + "/non_kafka_dir-delete") |
| try { |
| Log.parseTopicPartitionName(dir2) |
| fail("KafkaException should have been thrown for dir: " + dir2.getCanonicalPath) |
| } catch { |
| case _: KafkaException => // should only throw KafkaException |
| } |
| } |
| |
| def topicPartitionName(topic: String, partition: String): String = |
| topic + "-" + partition |
| |
| @Test |
| def testDeleteOldSegments() { |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 100) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.maybeAssignEpochStartOffset(0, 40) |
| log.maybeAssignEpochStartOffset(1, 90) |
| |
| // segments are not eligible for deletion if no high watermark has been set |
| val numSegments = log.numberOfSegments |
| log.deleteOldSegments() |
| assertEquals(numSegments, log.numberOfSegments) |
| assertEquals(0L, log.logStartOffset) |
| |
| // only segments with offset before the current high watermark are eligible for deletion |
| for (hw <- 25 to 30) { |
| log.onHighWatermarkIncremented(hw) |
| log.deleteOldSegments() |
| assertTrue(log.logStartOffset <= hw) |
| log.logSegments.foreach { segment => |
| val segmentFetchInfo = segment.read(startOffset = segment.baseOffset, maxOffset = None, maxSize = Int.MaxValue) |
| val segmentLastOffsetOpt = segmentFetchInfo.records.records.asScala.lastOption.map(_.offset) |
| segmentLastOffsetOpt.foreach { lastOffset => |
| assertTrue(lastOffset >= hw) |
| } |
| } |
| } |
| |
| // expire all segments |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) |
| assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size) |
| assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries.head) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 100) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.delete() |
| assertEquals("The number of segments should be 0", 0, log.numberOfSegments) |
| assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments()) |
| assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size) |
| } |
| |
| @Test |
| def testLogDeletionAfterClose() { |
| def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) |
| assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size) |
| |
| log.close() |
| log.delete() |
| assertEquals("The number of segments should be 0", 0, log.numberOfSegments) |
| assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size) |
| } |
| |
| @Test |
| def testLogDeletionAfterDeleteRecords() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5) |
| val log = createLog(logDir, logConfig) |
| |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| assertEquals("should have 3 segments", 3, log.numberOfSegments) |
| assertEquals(log.logStartOffset, 0) |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| |
| log.maybeIncrementLogStartOffset(1) |
| log.deleteOldSegments() |
| assertEquals("should have 3 segments", 3, log.numberOfSegments) |
| assertEquals(log.logStartOffset, 1) |
| |
| log.maybeIncrementLogStartOffset(6) |
| log.deleteOldSegments() |
| assertEquals("should have 2 segments", 2, log.numberOfSegments) |
| assertEquals(log.logStartOffset, 6) |
| |
| log.maybeIncrementLogStartOffset(15) |
| log.deleteOldSegments() |
| assertEquals("should have 1 segments", 1, log.numberOfSegments) |
| assertEquals(log.logStartOffset, 15) |
| } |
| |
| def epochCache(log: Log): LeaderEpochFileCache = { |
| log.leaderEpochCache.get |
| } |
| |
| @Test |
| def shouldDeleteSizeBasedSegments() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("should have 2 segments", 2,log.numberOfSegments) |
| } |
| |
| @Test |
| def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15) |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("should have 3 segments", 3,log.numberOfSegments) |
| } |
| |
| @Test |
| def shouldDeleteTimeBasedSegmentsReadyToBeDeleted() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000) |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments) |
| } |
| |
| @Test |
| def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000) |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("There should be 3 segments remaining", 3, log.numberOfSegments) |
| } |
| |
| @Test |
| def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact") |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| // mark oldest segment as older the retention.ms |
| log.logSegments.head.lastModified = mockTime.milliseconds - 20000 |
| |
| val segments = log.numberOfSegments |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("There should be 3 segments remaining", segments, log.numberOfSegments) |
| } |
| |
| @Test |
| def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete") |
| val log = createLog(logDir, logConfig) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("There should be 1 segment remaining", 1, log.numberOfSegments) |
| } |
| |
| @Test |
| def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = { |
| def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) |
| val recordsPerSegment = 5 |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000, cleanupPolicy = "compact") |
| val log = createLog(logDir, logConfig, brokerTopicStats) |
| |
| // append some messages to create some segments |
| for (_ <- 0 until 15) |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| |
| // Three segments should be created |
| assertEquals(3, log.logSegments.count(_ => true)) |
| log.maybeIncrementLogStartOffset(recordsPerSegment) |
| |
| // The first segment, which is entirely before the log start offset, should be deleted |
| // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset |
| // greater than the start offset |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals("There should be 2 segments remaining", 2, log.numberOfSegments) |
| assertTrue(log.logSegments.head.baseOffset <= log.logStartOffset) |
| assertTrue(log.logSegments.tail.forall(s => s.baseOffset > log.logStartOffset)) |
| } |
| |
| @Test |
| def shouldApplyEpochToMessageOnAppendIfLeader() { |
| val records = (0 until 50).toArray.map(id => new SimpleRecord(id.toString.getBytes)) |
| |
| //Given this partition is on leader epoch 72 |
| val epoch = 72 |
| val log = createLog(logDir, LogConfig()) |
| log.maybeAssignEpochStartOffset(epoch, records.size) |
| |
| //When appending messages as a leader (i.e. assignOffsets = true) |
| for (record <- records) |
| log.appendAsLeader( |
| MemoryRecords.withRecords(CompressionType.NONE, record), |
| leaderEpoch = epoch |
| ) |
| |
| //Then leader epoch should be set on messages |
| for (i <- records.indices) { |
| val read = readLog(log, i, 100, Some(i+1)).records.batches.iterator.next() |
| assertEquals("Should have set leader epoch", 72, read.partitionLeaderEpoch) |
| } |
| } |
| |
| @Test |
| def followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() { |
| val messageIds = (0 until 50).toArray |
| val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) |
| |
| //Given each message has an offset & epoch, as msgs from leader would |
| def recordsForEpoch(i: Int): MemoryRecords = { |
| val recs = MemoryRecords.withRecords(messageIds(i), CompressionType.NONE, records(i)) |
| recs.batches.asScala.foreach{record => |
| record.setPartitionLeaderEpoch(42) |
| record.setLastOffset(i) |
| } |
| recs |
| } |
| |
| val log = createLog(logDir, LogConfig()) |
| |
| //When appending as follower (assignOffsets = false) |
| for (i <- records.indices) |
| log.appendAsFollower(recordsForEpoch(i)) |
| |
| assertEquals(Some(42), log.latestEpoch) |
| } |
| |
| @Test |
| def shouldTruncateLeaderEpochsWhenDeletingSegments() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) |
| val log = createLog(logDir, logConfig) |
| val cache = epochCache(log) |
| |
| // Given three segments of 5 messages each |
| for (e <- 0 until 15) { |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| } |
| |
| //Given epochs |
| cache.assign(0, 0) |
| cache.assign(1, 5) |
| cache.assign(2, 10) |
| |
| //When first segment is removed |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| |
| //The oldest epoch entry should have been removed |
| assertEquals(ListBuffer(EpochEntry(1, 5), EpochEntry(2, 10)), cache.epochEntries) |
| } |
| |
| @Test |
| def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() { |
| def createRecords = TestUtils.singletonRecords("test".getBytes) |
| val logConfig = LogTest.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) |
| val log = createLog(logDir, logConfig) |
| val cache = epochCache(log) |
| |
| // Given three segments of 5 messages each |
| for (e <- 0 until 15) { |
| log.appendAsLeader(createRecords, leaderEpoch = 0) |
| } |
| |
| //Given epochs |
| cache.assign(0, 0) |
| cache.assign(1, 7) |
| cache.assign(2, 10) |
| |
| //When first segment removed (up to offset 5) |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| |
| //The first entry should have gone from (0,0) => (0,5) |
| assertEquals(ListBuffer(EpochEntry(0, 5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries) |
| } |
| |
| @Test |
| def shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() { |
| def createRecords(startOffset: Long, epoch: Int): MemoryRecords = { |
| TestUtils.records(Seq(new SimpleRecord("value".getBytes)), |
| baseOffset = startOffset, partitionLeaderEpoch = epoch) |
| } |
| |
| val logConfig = LogTest.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes) |
| val log = createLog(logDir, logConfig) |
| val cache = epochCache(log) |
| |
| def append(epoch: Int, startOffset: Long, count: Int): Unit = { |
| for (i <- 0 until count) |
| log.appendAsFollower(createRecords(startOffset + i, epoch)) |
| } |
| |
| //Given 2 segments, 10 messages per segment |
| append(epoch = 0, startOffset = 0, count = 10) |
| append(epoch = 1, startOffset = 10, count = 6) |
| append(epoch = 2, startOffset = 16, count = 4) |
| |
| assertEquals(2, log.numberOfSegments) |
| assertEquals(20, log.logEndOffset) |
| |
| //When truncate to LEO (no op) |
| log.truncateTo(log.logEndOffset) |
| |
| //Then no change |
| assertEquals(3, cache.epochEntries.size) |
| |
| //When truncate |
| log.truncateTo(11) |
| |
| //Then no change |
| assertEquals(2, cache.epochEntries.size) |
| |
| //When truncate |
| log.truncateTo(10) |
| |
| //Then |
| assertEquals(1, cache.epochEntries.size) |
| |
| //When truncate all |
| log.truncateTo(0) |
| |
| //Then |
| assertEquals(0, cache.epochEntries.size) |
| } |
| |
| /** |
| * Append a bunch of messages to a log and then re-open it with recovery and check that the leader epochs are recovered properly. |
| */ |
| @Test |
| def testLogRecoversForLeaderEpoch() { |
| val log = createLog(logDir, LogConfig()) |
| val leaderEpochCache = epochCache(log) |
| val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0) |
| log.appendAsFollower(records = firstBatch) |
| |
| val secondBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 1) |
| log.appendAsFollower(records = secondBatch) |
| |
| val thirdBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 2, offset = 2) |
| log.appendAsFollower(records = thirdBatch) |
| |
| val fourthBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 3, offset = 3) |
| log.appendAsFollower(records = fourthBatch) |
| |
| assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) |
| |
| // deliberately remove some of the epoch entries |
| leaderEpochCache.truncateFromEnd(2) |
| assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) |
| log.close() |
| |
| // reopen the log and recover from the beginning |
| val recoveredLog = createLog(logDir, LogConfig()) |
| val recoveredLeaderEpochCache = epochCache(recoveredLog) |
| |
| // epoch entries should be recovered |
| assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), recoveredLeaderEpochCache.epochEntries) |
| recoveredLog.close() |
| } |
| |
| /** |
| * Wrap a single record log buffer with leader epoch. |
| */ |
| private def singletonRecordsWithLeaderEpoch(value: Array[Byte], |
| key: Array[Byte] = null, |
| leaderEpoch: Int, |
| offset: Long, |
| codec: CompressionType = CompressionType.NONE, |
| timestamp: Long = RecordBatch.NO_TIMESTAMP, |
| magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): MemoryRecords = { |
| val records = Seq(new SimpleRecord(timestamp, key, value)) |
| |
| val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) |
| val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, offset, |
| System.currentTimeMillis, leaderEpoch) |
| records.foreach(builder.append) |
| builder.build() |
| } |
| |
| def testFirstUnstableOffsetNoTransactionalData() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| val records = MemoryRecords.withRecords(CompressionType.NONE, |
| new SimpleRecord("foo".getBytes), |
| new SimpleRecord("bar".getBytes), |
| new SimpleRecord("baz".getBytes)) |
| |
| log.appendAsLeader(records, leaderEpoch = 0) |
| assertEquals(None, log.firstUnstableOffset) |
| } |
| |
| @Test |
| def testFirstUnstableOffsetWithTransactionalData() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| val pid = 137L |
| val epoch = 5.toShort |
| var seq = 0 |
| |
| // add some transactional records |
| val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid, epoch, seq, |
| new SimpleRecord("foo".getBytes), |
| new SimpleRecord("bar".getBytes), |
| new SimpleRecord("baz".getBytes)) |
| |
| val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0) |
| assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // add more transactional records |
| seq += 3 |
| log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid, epoch, seq, |
| new SimpleRecord("blah".getBytes)), leaderEpoch = 0) |
| |
| // LSO should not have changed |
| assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // now transaction is committed |
| val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid, epoch), |
| isFromClient = false, leaderEpoch = 0) |
| |
| // first unstable offset is not updated until the high watermark is advanced |
| assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1) |
| |
| // now there should be no first unstable offset |
| assertEquals(None, log.firstUnstableOffset) |
| } |
| |
| @Test |
| def testTransactionIndexUpdated(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| val epoch = 0.toShort |
| |
| val pid1 = 1L |
| val pid2 = 2L |
| val pid3 = 3L |
| val pid4 = 4L |
| |
| val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch) |
| val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch) |
| val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch) |
| val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch) |
| |
| // mix transactional and non-transactional data |
| appendPid1(5) // nextOffset: 5 |
| appendNonTransactionalAsLeader(log, 3) // 8 |
| appendPid2(2) // 10 |
| appendPid1(4) // 14 |
| appendPid3(3) // 17 |
| appendNonTransactionalAsLeader(log, 2) // 19 |
| appendPid1(10) // 29 |
| appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30 |
| appendPid2(6) // 36 |
| appendPid4(3) // 39 |
| appendNonTransactionalAsLeader(log, 10) // 49 |
| appendPid3(9) // 58 |
| appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59 |
| appendPid4(8) // 67 |
| appendPid2(7) // 74 |
| appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75 |
| appendNonTransactionalAsLeader(log, 10) // 85 |
| appendPid4(4) // 89 |
| appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 |
| |
| val abortedTransactions = allAbortedTransactions(log) |
| assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) |
| } |
| |
| @Test |
| def testFullTransactionIndexRecovery(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5) |
| val log = createLog(logDir, logConfig) |
| val epoch = 0.toShort |
| |
| val pid1 = 1L |
| val pid2 = 2L |
| val pid3 = 3L |
| val pid4 = 4L |
| |
| val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch) |
| val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch) |
| val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch) |
| val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch) |
| |
| // mix transactional and non-transactional data |
| appendPid1(5) // nextOffset: 5 |
| appendNonTransactionalAsLeader(log, 3) // 8 |
| appendPid2(2) // 10 |
| appendPid1(4) // 14 |
| appendPid3(3) // 17 |
| appendNonTransactionalAsLeader(log, 2) // 19 |
| appendPid1(10) // 29 |
| appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30 |
| appendPid2(6) // 36 |
| appendPid4(3) // 39 |
| appendNonTransactionalAsLeader(log, 10) // 49 |
| appendPid3(9) // 58 |
| appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59 |
| appendPid4(8) // 67 |
| appendPid2(7) // 74 |
| appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75 |
| appendNonTransactionalAsLeader(log, 10) // 85 |
| appendPid4(4) // 89 |
| appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 |
| |
| // delete all the offset and transaction index files to force recovery |
| log.logSegments.foreach { segment => |
| segment.offsetIndex.deleteIfExists() |
| segment.txnIndex.deleteIfExists() |
| } |
| |
| log.close() |
| |
| val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5) |
| val reloadedLog = createLog(logDir, reloadedLogConfig) |
| val abortedTransactions = allAbortedTransactions(reloadedLog) |
| assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) |
| } |
| |
| @Test |
| def testRecoverOnlyLastSegment(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5) |
| val log = createLog(logDir, logConfig) |
| val epoch = 0.toShort |
| |
| val pid1 = 1L |
| val pid2 = 2L |
| val pid3 = 3L |
| val pid4 = 4L |
| |
| val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch) |
| val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch) |
| val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch) |
| val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch) |
| |
| // mix transactional and non-transactional data |
| appendPid1(5) // nextOffset: 5 |
| appendNonTransactionalAsLeader(log, 3) // 8 |
| appendPid2(2) // 10 |
| appendPid1(4) // 14 |
| appendPid3(3) // 17 |
| appendNonTransactionalAsLeader(log, 2) // 19 |
| appendPid1(10) // 29 |
| appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30 |
| appendPid2(6) // 36 |
| appendPid4(3) // 39 |
| appendNonTransactionalAsLeader(log, 10) // 49 |
| appendPid3(9) // 58 |
| appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59 |
| appendPid4(8) // 67 |
| appendPid2(7) // 74 |
| appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75 |
| appendNonTransactionalAsLeader(log, 10) // 85 |
| appendPid4(4) // 89 |
| appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 |
| |
| // delete the last offset and transaction index files to force recovery |
| val lastSegment = log.logSegments.last |
| val recoveryPoint = lastSegment.baseOffset |
| lastSegment.offsetIndex.deleteIfExists() |
| lastSegment.txnIndex.deleteIfExists() |
| |
| log.close() |
| |
| val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5) |
| val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint) |
| val abortedTransactions = allAbortedTransactions(reloadedLog) |
| assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) |
| } |
| |
| @Test |
| def testRecoverLastSegmentWithNoSnapshots(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 128 * 5) |
| val log = createLog(logDir, logConfig) |
| val epoch = 0.toShort |
| |
| val pid1 = 1L |
| val pid2 = 2L |
| val pid3 = 3L |
| val pid4 = 4L |
| |
| val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch) |
| val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch) |
| val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch) |
| val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch) |
| |
| // mix transactional and non-transactional data |
| appendPid1(5) // nextOffset: 5 |
| appendNonTransactionalAsLeader(log, 3) // 8 |
| appendPid2(2) // 10 |
| appendPid1(4) // 14 |
| appendPid3(3) // 17 |
| appendNonTransactionalAsLeader(log, 2) // 19 |
| appendPid1(10) // 29 |
| appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30 |
| appendPid2(6) // 36 |
| appendPid4(3) // 39 |
| appendNonTransactionalAsLeader(log, 10) // 49 |
| appendPid3(9) // 58 |
| appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59 |
| appendPid4(8) // 67 |
| appendPid2(7) // 74 |
| appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75 |
| appendNonTransactionalAsLeader(log, 10) // 85 |
| appendPid4(4) // 89 |
| appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90 |
| |
| deleteProducerSnapshotFiles() |
| |
| // delete the last offset and transaction index files to force recovery. this should force us to rebuild |
| // the producer state from the start of the log |
| val lastSegment = log.logSegments.last |
| val recoveryPoint = lastSegment.baseOffset |
| lastSegment.offsetIndex.deleteIfExists() |
| lastSegment.txnIndex.deleteIfExists() |
| |
| log.close() |
| |
| val reloadedLogConfig = LogTest.createLogConfig(segmentBytes = 1024 * 5) |
| val reloadedLog = createLog(logDir, reloadedLogConfig, recoveryPoint = recoveryPoint) |
| val abortedTransactions = allAbortedTransactions(reloadedLog) |
| assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) |
| } |
| |
| @Test |
| def testTransactionIndexUpdatedThroughReplication(): Unit = { |
| val epoch = 0.toShort |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| val buffer = ByteBuffer.allocate(2048) |
| |
| val pid1 = 1L |
| val pid2 = 2L |
| val pid3 = 3L |
| val pid4 = 4L |
| |
| val appendPid1 = appendTransactionalToBuffer(buffer, pid1, epoch) |
| val appendPid2 = appendTransactionalToBuffer(buffer, pid2, epoch) |
| val appendPid3 = appendTransactionalToBuffer(buffer, pid3, epoch) |
| val appendPid4 = appendTransactionalToBuffer(buffer, pid4, epoch) |
| |
| appendPid1(0L, 5) |
| appendNonTransactionalToBuffer(buffer, 5L, 3) |
| appendPid2(8L, 2) |
| appendPid1(10L, 4) |
| appendPid3(14L, 3) |
| appendNonTransactionalToBuffer(buffer, 17L, 2) |
| appendPid1(19L, 10) |
| appendEndTxnMarkerToBuffer(buffer, pid1, epoch, 29L, ControlRecordType.ABORT) |
| appendPid2(30L, 6) |
| appendPid4(36L, 3) |
| appendNonTransactionalToBuffer(buffer, 39L, 10) |
| appendPid3(49L, 9) |
| appendEndTxnMarkerToBuffer(buffer, pid3, epoch, 58L, ControlRecordType.COMMIT) |
| appendPid4(59L, 8) |
| appendPid2(67L, 7) |
| appendEndTxnMarkerToBuffer(buffer, pid2, epoch, 74L, ControlRecordType.ABORT) |
| appendNonTransactionalToBuffer(buffer, 75L, 10) |
| appendPid4(85L, 4) |
| appendEndTxnMarkerToBuffer(buffer, pid4, epoch, 89L, ControlRecordType.COMMIT) |
| |
| buffer.flip() |
| |
| appendAsFollower(log, MemoryRecords.readableRecords(buffer)) |
| |
| val abortedTransactions = allAbortedTransactions(log) |
| assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions) |
| } |
| |
| @Test(expected = classOf[TransactionCoordinatorFencedException]) |
| def testZombieCoordinatorFenced(): Unit = { |
| val pid = 1L |
| val epoch = 0.toShort |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| val append = appendTransactionalAsLeader(log, pid, epoch) |
| |
| append(10) |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) |
| |
| append(5) |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, coordinatorEpoch = 2) |
| |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) |
| } |
| |
| @Test |
| def testZombieCoordinatorFencedEmptyTransaction(): Unit = { |
| val pid = 1L |
| val epoch = 0.toShort |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| val buffer = ByteBuffer.allocate(256) |
| val append = appendTransactionalToBuffer(buffer, pid, epoch, leaderEpoch = 1) |
| append(0, 10) |
| appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, ControlRecordType.COMMIT, |
| coordinatorEpoch = 0, leaderEpoch = 1) |
| |
| buffer.flip() |
| log.appendAsFollower(MemoryRecords.readableRecords(buffer)) |
| |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1) |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1) |
| assertThrows[TransactionCoordinatorFencedException] { |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1, leaderEpoch = 1) |
| } |
| } |
| |
| @Test |
| def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| val epoch = 0.toShort |
| val pid = 1L |
| val appendPid = appendTransactionalAsLeader(log, pid, epoch) |
| |
| appendPid(5) |
| appendNonTransactionalAsLeader(log, 3) |
| assertEquals(8L, log.logEndOffset) |
| |
| log.roll() |
| assertEquals(2, log.logSegments.size) |
| appendPid(5) |
| |
| assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| log.maybeIncrementLogStartOffset(5L) |
| |
| // the first unstable offset should be lower bounded by the log start offset |
| assertEquals(Some(5L), log.firstUnstableOffset.map(_.messageOffset)) |
| } |
| |
| @Test |
| def testFirstUnstableOffsetDoesNotExceedLogStartOffsetAfterSegmentDeletion(): Unit = { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| val epoch = 0.toShort |
| val pid = 1L |
| val appendPid = appendTransactionalAsLeader(log, pid, epoch) |
| |
| appendPid(5) |
| appendNonTransactionalAsLeader(log, 3) |
| assertEquals(8L, log.logEndOffset) |
| |
| log.roll() |
| assertEquals(2, log.logSegments.size) |
| appendPid(5) |
| |
| assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| log.maybeIncrementLogStartOffset(8L) |
| log.onHighWatermarkIncremented(log.logEndOffset) |
| log.deleteOldSegments() |
| assertEquals(1, log.logSegments.size) |
| |
| // the first unstable offset should be lower bounded by the log start offset |
| assertEquals(Some(8L), log.firstUnstableOffset.map(_.messageOffset)) |
| } |
| |
| @Test |
| def testAppendToTransactionIndexFailure(): Unit = { |
| val pid = 1L |
| val epoch = 0.toShort |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| val append = appendTransactionalAsLeader(log, pid, epoch) |
| append(10) |
| |
| // Kind of a hack, but renaming the index to a directory ensures that the append |
| // to the index will fail. |
| log.activeSegment.txnIndex.renameTo(log.dir) |
| |
| // The append will be written to the log successfully, but the write to the index will fail |
| assertThrows[KafkaStorageException] { |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) |
| } |
| assertEquals(11L, log.logEndOffset) |
| assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // Try the append a second time. The appended offset in the log should still increase. |
| assertThrows[KafkaStorageException] { |
| appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) |
| } |
| assertEquals(12L, log.logEndOffset) |
| assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // Even if the high watermark is updated, the first unstable offset does not move |
| log.onHighWatermarkIncremented(12L) |
| assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| log.close() |
| |
| val reopenedLog = createLog(logDir, logConfig) |
| assertEquals(12L, reopenedLog.logEndOffset) |
| assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size) |
| reopenedLog.onHighWatermarkIncremented(12L) |
| assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset)) |
| } |
| |
| @Test |
| def testLastStableOffsetWithMixedProducerData() { |
| val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) |
| val log = createLog(logDir, logConfig) |
| |
| // for convenience, both producers share the same epoch |
| val epoch = 5.toShort |
| |
| val pid1 = 137L |
| val seq1 = 0 |
| val pid2 = 983L |
| val seq2 = 0 |
| |
| // add some transactional records |
| val firstAppendInfo = log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid1, epoch, seq1, |
| new SimpleRecord("a".getBytes), |
| new SimpleRecord("b".getBytes), |
| new SimpleRecord("c".getBytes)), leaderEpoch = 0) |
| assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // mix in some non-transactional data |
| log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, |
| new SimpleRecord("g".getBytes), |
| new SimpleRecord("h".getBytes), |
| new SimpleRecord("i".getBytes)), leaderEpoch = 0) |
| |
| // append data from a second transactional producer |
| val secondAppendInfo = log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid2, epoch, seq2, |
| new SimpleRecord("d".getBytes), |
| new SimpleRecord("e".getBytes), |
| new SimpleRecord("f".getBytes)), leaderEpoch = 0) |
| |
| // LSO should not have changed |
| assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // now first producer's transaction is aborted |
| val abortAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid1, epoch), |
| isFromClient = false, leaderEpoch = 0) |
| log.onHighWatermarkIncremented(abortAppendInfo.lastOffset + 1) |
| |
| // LSO should now point to one less than the first offset of the second transaction |
| assertEquals(Some(secondAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // commit the second transaction |
| val commitAppendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.COMMIT, pid2, epoch), |
| isFromClient = false, leaderEpoch = 0) |
| log.onHighWatermarkIncremented(commitAppendInfo.lastOffset + 1) |
| |
| // now there should be no first unstable offset |
| assertEquals(None, log.firstUnstableOffset) |
| } |
| |
| @Test |
| def testAbortedTransactionSpanningMultipleSegments() { |
| val pid = 137L |
| val epoch = 5.toShort |
| var seq = 0 |
| |
| val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid, epoch, seq, |
| new SimpleRecord("a".getBytes), |
| new SimpleRecord("b".getBytes), |
| new SimpleRecord("c".getBytes)) |
| |
| val logConfig = LogTest.createLogConfig(segmentBytes = records.sizeInBytes) |
| val log = createLog(logDir, logConfig) |
| |
| val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0) |
| assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset)) |
| |
| // this write should spill to the second segment |
| seq = 3 |
| log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid, epoch, seq, |
| new SimpleRecord("d".getBytes), |
| new SimpleRecord("e".getBytes), |
| new SimpleRecord("f".getBytes)), leaderEpoch = 0) |
| assertEquals(Some(firstAppendInfo.firstOffset.get), log.firstUnstableOffset.map(_.messageOffset)) |
| assertEquals(Some(0L), log.firstUnstableOffset.map(_.segmentBaseOffset)) |
| assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset) |
| |
| // now abort the transaction |
| val appendInfo = log.appendAsLeader(endTxnRecords(ControlRecordType.ABORT, pid, epoch), |
| isFromClient = false, leaderEpoch = 0) |
| log.onHighWatermarkIncremented(appendInfo.lastOffset + 1) |
| assertEquals(None, log.firstUnstableOffset.map(_.messageOffset)) |
| |
| // now check that a fetch includes the aborted transaction |
| val fetchDataInfo = log.read(0L, 2048, maxOffset = None, minOneMessage = true, includeAbortedTxns = true) |
| assertEquals(1, fetchDataInfo.abortedTransactions.size) |
| |
| assertTrue(fetchDataInfo.abortedTransactions.isDefined) |
| assertEquals(new AbortedTransaction(pid, 0), fetchDataInfo.abortedTransactions.get.head) |
| } |
| |
| @Test |
| def testLoadPartitionDirWithNoSegmentsShouldNotThrow() { |
| val dirName = Log.logDeleteDirName(new TopicPartition("foo", 3)) |
| val logDir = new File(tmpDir, dirName) |
| logDir.mkdirs() |
| val logConfig = LogTest.createLogConfig() |
| val log = createLog(logDir, logConfig) |
| assertEquals(1, log.numberOfSegments) |
| } |
| |
| private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns) |
| |
| private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = { |
| var sequence = 0 |
| numRecords: Int => { |
| val simpleRecords = (sequence until sequence + numRecords).map { seq => |
| new SimpleRecord(s"$seq".getBytes) |
| } |
| val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, |
| producerEpoch, sequence, simpleRecords: _*) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| sequence += numRecords |
| } |
| } |
| |
| private def appendEndTxnMarkerAsLeader(log: Log, |
| producerId: Long, |
| producerEpoch: Short, |
| controlType: ControlRecordType, |
| coordinatorEpoch: Int = 0, |
| leaderEpoch: Int = 0): Unit = { |
| val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch) |
| log.appendAsLeader(records, isFromClient = false, leaderEpoch = leaderEpoch) |
| } |
| |
| private def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = { |
| val simpleRecords = (0 until numRecords).map { seq => |
| new SimpleRecord(s"$seq".getBytes) |
| } |
| val records = MemoryRecords.withRecords(CompressionType.NONE, simpleRecords: _*) |
| log.appendAsLeader(records, leaderEpoch = 0) |
| } |
| |
| private def appendTransactionalToBuffer(buffer: ByteBuffer, |
| producerId: Long, |
| producerEpoch: Short, |
| leaderEpoch: Int = 0): (Long, Int) => Unit = { |
| var sequence = 0 |
| (offset: Long, numRecords: Int) => { |
| val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, |
| offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch) |
| for (seq <- sequence until sequence + numRecords) { |
| val record = new SimpleRecord(s"$seq".getBytes) |
| builder.append(record) |
| } |
| |
| sequence += numRecords |
| builder.close() |
| } |
| } |
| |
| private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, |
| producerId: Long, |
| producerEpoch: Short, |
| offset: Long, |
| controlType: ControlRecordType, |
| coordinatorEpoch: Int = 0, |
| leaderEpoch: Int = 0): Unit = { |
| val marker = new EndTransactionMarker(controlType, coordinatorEpoch) |
| MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker) |
| } |
| |
| private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = { |
| val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, offset) |
| (0 until numRecords).foreach { seq => |
| builder.append(new SimpleRecord(s"$seq".getBytes)) |
| } |
| builder.close() |
| } |
| |
| private def appendAsFollower(log: Log, records: MemoryRecords, leaderEpoch: Int = 0): Unit = { |
| records.batches.asScala.foreach(_.setPartitionLeaderEpoch(leaderEpoch)) |
| log.appendAsFollower(records) |
| } |
| |
| private def createCleanShutdownFile(): File = { |
| val parentLogDir = logDir.getParentFile |
| assertTrue("Data directory %s must exist", parentLogDir.isDirectory) |
| val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile) |
| cleanShutdownFile.createNewFile() |
| assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) |
| cleanShutdownFile |
| } |
| |
| private def deleteProducerSnapshotFiles(): Unit = { |
| val files = logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.ProducerSnapshotFileSuffix)) |
| files.foreach(Utils.delete) |
| } |
| |
| private def listProducerSnapshotOffsets: Seq[Long] = |
| ProducerStateManager.listSnapshotFiles(logDir).map(Log.offsetFromFile).sorted |
| |
| private def createLog(dir: File, |
| config: LogConfig, |
| brokerTopicStats: BrokerTopicStats = brokerTopicStats, |
| logStartOffset: Long = 0L, |
| recoveryPoint: Long = 0L, |
| scheduler: Scheduler = mockTime.scheduler, |
| time: Time = mockTime, |
| maxProducerIdExpirationMs: Int = 60 * 60 * 1000, |
| producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs): Log = { |
| LogTest.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint, |
| maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs) |
| } |
| |
| private def createLogWithOffsetOverflow(logConfig: LogConfig): (Log, LogSegment) = { |
| LogTest.initializeLogDirWithOverflowedSegment(logDir) |
| |
| val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue) |
| val segmentWithOverflow = LogTest.firstOverflowSegment(log).getOrElse { |
| Assertions.fail("Failed to create log with a segment which has overflowed offsets") |
| } |
| |
| (log, segmentWithOverflow) |
| } |
| |
| private def recoverAndCheck(config: LogConfig, |
| expectedKeys: Iterable[Long], |
| expectDeletedFiles: Boolean = true): Log = { |
| LogTest.recoverAndCheck(logDir, config, expectedKeys, brokerTopicStats, mockTime, mockTime.scheduler, |
| expectDeletedFiles) |
| } |
| |
| private def readLog(log: Log, startOffset: Long, maxLength: Int, |
| maxOffset: Option[Long] = None, |
| minOneMessage: Boolean = true): FetchDataInfo = { |
| log.read(startOffset, maxLength, maxOffset, minOneMessage, includeAbortedTxns = false) |
| } |
| |
| } |
| |
| object LogTest { |
| def createLogConfig(segmentMs: Long = Defaults.SegmentMs, |
| segmentBytes: Int = Defaults.SegmentSize, |
| retentionMs: Long = Defaults.RetentionMs, |
| retentionBytes: Long = Defaults.RetentionSize, |
| segmentJitterMs: Long = Defaults.SegmentJitterMs, |
| cleanupPolicy: String = Defaults.CleanupPolicy, |
| maxMessageBytes: Int = Defaults.MaxMessageSize, |
| indexIntervalBytes: Int = Defaults.IndexInterval, |
| segmentIndexBytes: Int = Defaults.MaxIndexSize, |
| messageFormatVersion: String = Defaults.MessageFormatVersion, |
| fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): LogConfig = { |
| val logProps = new Properties() |
| |
| logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long) |
| logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer) |
| logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long) |
| logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long) |
| logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: java.lang.Long) |
| logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy) |
| logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer) |
| logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer) |
| logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer) |
| logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion) |
| logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: java.lang.Long) |
| LogConfig(logProps) |
| } |
| |
| def createLog(dir: File, |
| config: LogConfig, |
| brokerTopicStats: BrokerTopicStats, |
| scheduler: Scheduler, |
| time: Time, |
| logStartOffset: Long = 0L, |
| recoveryPoint: Long = 0L, |
| maxProducerIdExpirationMs: Int = 60 * 60 * 1000, |
| producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs): Log = { |
| Log(dir = dir, |
| config = config, |
| logStartOffset = logStartOffset, |
| recoveryPoint = recoveryPoint, |
| scheduler = scheduler, |
| brokerTopicStats = brokerTopicStats, |
| time = time, |
| maxProducerIdExpirationMs = maxProducerIdExpirationMs, |
| producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs, |
| logDirFailureChannel = new LogDirFailureChannel(10)) |
| } |
| |
| /** |
| * Check if the given log contains any segment with records that cause offset overflow. |
| * @param log Log to check |
| * @return true if log contains at least one segment with offset overflow; false otherwise |
| */ |
| def hasOffsetOverflow(log: Log): Boolean = firstOverflowSegment(log).isDefined |
| |
| def firstOverflowSegment(log: Log): Option[LogSegment] = { |
| def hasOverflow(baseOffset: Long, batch: RecordBatch): Boolean = |
| batch.lastOffset > baseOffset + Int.MaxValue || batch.baseOffset < baseOffset |
| |
| for (segment <- log.logSegments) { |
| val overflowBatch = segment.log.batches.asScala.find(batch => hasOverflow(segment.baseOffset, batch)) |
| if (overflowBatch.isDefined) |
| return Some(segment) |
| } |
| None |
| } |
| |
| private def rawSegment(logDir: File, baseOffset: Long): FileRecords = |
| FileRecords.open(Log.logFile(logDir, baseOffset)) |
| |
| /** |
| * Initialize the given log directory with a set of segments, one of which will have an |
| * offset which overflows the segment |
| */ |
| def initializeLogDirWithOverflowedSegment(logDir: File): Unit = { |
| def writeSampleBatches(baseOffset: Long, segment: FileRecords): Long = { |
| def record(offset: Long) = { |
| val data = offset.toString.getBytes |
| new SimpleRecord(data, data) |
| } |
| |
| segment.append(MemoryRecords.withRecords(baseOffset, CompressionType.NONE, 0, |
| record(baseOffset))) |
| segment.append(MemoryRecords.withRecords(baseOffset + 1, CompressionType.NONE, 0, |
| record(baseOffset + 1), |
| record(baseOffset + 2))) |
| segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1, CompressionType.NONE, 0, |
| record(baseOffset + Int.MaxValue - 1))) |
| // Need to create the offset files explicitly to avoid triggering segment recovery to truncate segment. |
| Log.offsetIndexFile(logDir, baseOffset).createNewFile() |
| Log.timeIndexFile(logDir, baseOffset).createNewFile() |
| baseOffset + Int.MaxValue |
| } |
| |
| def writeNormalSegment(baseOffset: Long): Long = { |
| val segment = rawSegment(logDir, baseOffset) |
| try writeSampleBatches(baseOffset, segment) |
| finally segment.close() |
| } |
| |
| def writeOverflowSegment(baseOffset: Long): Long = { |
| val segment = rawSegment(logDir, baseOffset) |
| try { |
| val nextOffset = writeSampleBatches(baseOffset, segment) |
| writeSampleBatches(nextOffset, segment) |
| } finally segment.close() |
| } |
| |
| // We create three segments, the second of which contains offsets which overflow |
| var nextOffset = 0L |
| nextOffset = writeNormalSegment(nextOffset) |
| nextOffset = writeOverflowSegment(nextOffset) |
| writeNormalSegment(nextOffset) |
| } |
| |
| def allRecords(log: Log): List[Record] = { |
| val recordsFound = ListBuffer[Record]() |
| for (logSegment <- log.logSegments) { |
| for (batch <- logSegment.log.batches.asScala) { |
| recordsFound ++= batch.iterator().asScala |
| } |
| } |
| recordsFound.toList |
| } |
| |
| def verifyRecordsInLog(log: Log, expectedRecords: List[Record]): Unit = { |
| assertEquals(expectedRecords, allRecords(log)) |
| } |
| |
| /* extract all the keys from a log */ |
| def keysInLog(log: Log): Iterable[Long] = { |
| for (logSegment <- log.logSegments; |
| batch <- logSegment.log.batches.asScala if !batch.isControlBatch; |
| record <- batch.asScala if record.hasValue && record.hasKey) |
| yield TestUtils.readString(record.key).toLong |
| } |
| |
| def recoverAndCheck(logDir: File, |
| config: LogConfig, |
| expectedKeys: Iterable[Long], |
| brokerTopicStats: BrokerTopicStats, |
| time: Time, |
| scheduler: Scheduler, |
| expectDeletedFiles: Boolean = false): Log = { |
| // Recover log file and check that after recovery, keys are as expected |
| // and all temporary files have been deleted |
| val recoveredLog = createLog(logDir, config, brokerTopicStats, scheduler, time) |
| time.sleep(config.fileDeleteDelayMs + 1) |
| for (file <- logDir.listFiles) { |
| if (!expectDeletedFiles) |
| assertFalse("Unexpected .deleted file after recovery", file.getName.endsWith(Log.DeletedFileSuffix)) |
| assertFalse("Unexpected .cleaned file after recovery", file.getName.endsWith(Log.CleanedFileSuffix)) |
| assertFalse("Unexpected .swap file after recovery", file.getName.endsWith(Log.SwapFileSuffix)) |
| } |
| assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) |
| assertFalse(LogTest.hasOffsetOverflow(recoveredLog)) |
| recoveredLog |
| } |
| } |