| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.raft |
| |
| import kafka.log.UnifiedLog |
| import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp} |
| import kafka.server.{KafkaConfig, KafkaRaftServer} |
| import kafka.utils.TestUtils |
| import org.apache.kafka.common.errors.{InvalidConfigurationException, RecordTooLargeException} |
| import org.apache.kafka.common.protocol |
| import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable} |
| import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} |
| import org.apache.kafka.common.utils.Utils |
| import org.apache.kafka.raft._ |
| import org.apache.kafka.raft.internals.BatchBuilder |
| import org.apache.kafka.server.common.serialization.RecordSerde |
| import org.apache.kafka.server.util.MockTime |
| import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots} |
| import org.apache.kafka.storage.internals.log.{LogConfig, LogStartOffsetIncrementReason} |
| import org.apache.kafka.test.TestUtils.assertOptional |
| import org.junit.jupiter.api.Assertions._ |
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} |
| |
| import java.io.File |
| import java.nio.ByteBuffer |
| import java.nio.file.{Files, Path} |
| import java.util |
| import java.util.{Collections, Optional, Properties} |
| |
| final class KafkaMetadataLogTest { |
| import KafkaMetadataLogTest._ |
| |
| var tempDir: File = _ |
| val mockTime = new MockTime() |
| |
| @BeforeEach |
| def setUp(): Unit = { |
| tempDir = TestUtils.tempDir() |
| } |
| |
| @AfterEach |
| def tearDown(): Unit = { |
| Utils.delete(tempDir) |
| } |
| |
| @Test |
| def testConfig(): Unit = { |
| val props = new Properties() |
| props.put(ProcessRolesProp, util.Arrays.asList("broker")) |
| props.put(QuorumVotersProp, "1@localhost:9093") |
| props.put(NodeIdProp, Int.box(2)) |
| props.put(KafkaConfig.ControllerListenerNamesProp, "SSL") |
| props.put(MetadataLogSegmentBytesProp, Int.box(10240)) |
| props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024)) |
| assertThrows(classOf[InvalidConfigurationException], () => { |
| val kafkaConfig = KafkaConfig.fromProps(props) |
| val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES) |
| buildMetadataLog(tempDir, mockTime, metadataConfig) |
| }) |
| |
| props.put(MetadataLogSegmentMinBytesProp, Int.box(10240)) |
| val kafkaConfig = KafkaConfig.fromProps(props) |
| val metadataConfig = MetadataLogConfig(kafkaConfig, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES) |
| buildMetadataLog(tempDir, mockTime, metadataConfig) |
| } |
| |
| @Test |
| def testUnexpectedAppendOffset(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val recordFoo = new SimpleRecord("foo".getBytes()) |
| val currentEpoch = 3 |
| val initialOffset = log.endOffset().offset |
| |
| log.appendAsLeader( |
| MemoryRecords.withRecords(initialOffset, CompressionType.NONE, currentEpoch, recordFoo), |
| currentEpoch |
| ) |
| |
| // Throw exception for out of order records |
| assertThrows( |
| classOf[RuntimeException], |
| () => { |
| log.appendAsLeader( |
| MemoryRecords.withRecords(initialOffset, CompressionType.NONE, currentEpoch, recordFoo), |
| currentEpoch |
| ) |
| } |
| ) |
| |
| assertThrows( |
| classOf[RuntimeException], |
| () => { |
| log.appendAsFollower( |
| MemoryRecords.withRecords(initialOffset, CompressionType.NONE, currentEpoch, recordFoo) |
| ) |
| } |
| ) |
| } |
| |
| @Test |
| def testCreateSnapshot(): Unit = { |
| val numberOfRecords = 10 |
| val epoch = 1 |
| val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertEquals(0, log.readSnapshot(snapshotId).get().sizeInBytes()) |
| } |
| |
| @Test |
| def testCreateSnapshotFromEndOffset(): Unit = { |
| val numberOfRecords = 10 |
| val firstEpoch = 1 |
| val secondEpoch = 3 |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, firstEpoch) |
| append(log, numberOfRecords, secondEpoch) |
| log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)) |
| |
| // Test finding the first epoch |
| log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch)).get().close() |
| log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, firstEpoch)).get().close() |
| log.createNewSnapshot(new OffsetAndEpoch(1, firstEpoch)).get().close() |
| |
| // Test finding the second epoch |
| log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch)).get().close() |
| log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch)).get().close() |
| log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, secondEpoch)).get().close() |
| } |
| |
| @Test |
| def testCreateSnapshotLaterThanHighWatermark(): Unit = { |
| val numberOfRecords = 10 |
| val epoch = 1 |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| assertThrows( |
| classOf[IllegalArgumentException], |
| () => log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch)) |
| ) |
| } |
| |
| @Test |
| def testCreateSnapshotMuchLaterEpoch(): Unit = { |
| val numberOfRecords = 10 |
| val epoch = 1 |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| assertThrows( |
| classOf[IllegalArgumentException], |
| () => log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, epoch + 1)) |
| ) |
| } |
| |
| @Test |
| def testCreateSnapshotBeforeLogStartOffset(): Unit = { |
| val numberOfRecords = 10 |
| val epoch = 1 |
| val snapshotId = new OffsetAndEpoch(numberOfRecords-4, epoch) |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| // Simulate log cleanup that advances the LSO |
| log.log.maybeIncrementLogStartOffset(snapshotId.offset - 1, LogStartOffsetIncrementReason.SegmentDeletion) |
| |
| assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset - 2, snapshotId.epoch))) |
| } |
| |
| @Test |
| def testCreateSnapshotDivergingEpoch(): Unit = { |
| val numberOfRecords = 10 |
| val epoch = 2 |
| val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| assertThrows( |
| classOf[IllegalArgumentException], |
| () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch - 1)) |
| ) |
| } |
| |
| @Test |
| def testCreateSnapshotOlderEpoch(): Unit = { |
| val numberOfRecords = 10 |
| val epoch = 2 |
| val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertThrows( |
| classOf[IllegalArgumentException], |
| () => log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch - 1)) |
| ) |
| } |
| |
| @Test |
| def testCreateSnapshotWithMissingEpoch(): Unit = { |
| val firstBatchRecords = 5 |
| val firstEpoch = 1 |
| val missingEpoch = firstEpoch + 1 |
| val secondBatchRecords = 5 |
| val secondEpoch = missingEpoch + 1 |
| |
| val numberOfRecords = firstBatchRecords + secondBatchRecords |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, firstBatchRecords, firstEpoch) |
| append(log, secondBatchRecords, secondEpoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| assertThrows( |
| classOf[IllegalArgumentException], |
| () => log.createNewSnapshot(new OffsetAndEpoch(1, missingEpoch)) |
| ) |
| assertThrows( |
| classOf[IllegalArgumentException], |
| () => log.createNewSnapshot(new OffsetAndEpoch(firstBatchRecords, missingEpoch)) |
| ) |
| assertThrows( |
| classOf[IllegalArgumentException], |
| () => log.createNewSnapshot(new OffsetAndEpoch(secondBatchRecords, missingEpoch)) |
| ) |
| } |
| |
| @Test |
| def testCreateExistingSnapshot(): Unit = { |
| val numberOfRecords = 10 |
| val epoch = 1 |
| val snapshotId = new OffsetAndEpoch(numberOfRecords - 1, epoch) |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId), |
| "Creating an existing snapshot should not do anything") |
| } |
| |
| @Test |
| def testTopicId(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| assertEquals(KafkaRaftServer.MetadataTopicId, log.topicId()) |
| } |
| |
| @Test |
| def testReadMissingSnapshot(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| assertEquals(Optional.empty(), log.readSnapshot(new OffsetAndEpoch(10, 0))) |
| } |
| |
| @Test |
| def testDeleteNonExistentSnapshot(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| val offset = 10 |
| val epoch = 0 |
| |
| append(log, offset, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(offset)) |
| |
| assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(2L, epoch))) |
| assertEquals(0, log.startOffset) |
| assertEquals(epoch, log.lastFetchedEpoch) |
| assertEquals(offset, log.endOffset().offset) |
| assertEquals(offset, log.highWatermark.offset) |
| } |
| |
| @Test |
| def testTruncateFullyToLatestSnapshot(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| val numberOfRecords = 10 |
| val epoch = 0 |
| val sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch) |
| |
| append(log, numberOfRecords, epoch) |
| |
| TestUtils.resource(log.storeSnapshot(sameEpochSnapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertTrue(log.truncateToLatestSnapshot()) |
| assertEquals(sameEpochSnapshotId.offset, log.startOffset) |
| assertEquals(sameEpochSnapshotId.epoch, log.lastFetchedEpoch) |
| assertEquals(sameEpochSnapshotId.offset, log.endOffset().offset) |
| assertEquals(sameEpochSnapshotId.offset, log.highWatermark.offset) |
| |
| val greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1) |
| |
| append(log, numberOfRecords, epoch) |
| |
| TestUtils.resource(log.storeSnapshot(greaterEpochSnapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertTrue(log.truncateToLatestSnapshot()) |
| assertEquals(greaterEpochSnapshotId.offset, log.startOffset) |
| assertEquals(greaterEpochSnapshotId.epoch, log.lastFetchedEpoch) |
| assertEquals(greaterEpochSnapshotId.offset, log.endOffset().offset) |
| assertEquals(greaterEpochSnapshotId.offset, log.highWatermark.offset) |
| } |
| |
| @Test |
| def testTruncateWillRemoveOlderSnapshot(): Unit = { |
| val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime) |
| val numberOfRecords = 10 |
| val epoch = 1 |
| |
| append(log, 1, epoch - 1) |
| val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1) |
| TestUtils.resource(log.storeSnapshot(oldSnapshotId1).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| append(log, 1, epoch) |
| val oldSnapshotId2 = new OffsetAndEpoch(2, epoch) |
| TestUtils.resource(log.storeSnapshot(oldSnapshotId2).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| append(log, numberOfRecords - 2, epoch) |
| val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch) |
| TestUtils.resource(log.storeSnapshot(oldSnapshotId3).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch) |
| append(log, numberOfRecords, epoch) |
| TestUtils.resource(log.storeSnapshot(greaterSnapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertNotEquals(log.earliestSnapshotId(), log.latestSnapshotId()) |
| assertTrue(log.truncateToLatestSnapshot()) |
| assertEquals(log.earliestSnapshotId(), log.latestSnapshotId()) |
| log.close() |
| |
| mockTime.sleep(config.fileDeleteDelayMs) |
| // Assert that the log dir doesn't contain any older snapshots |
| Files |
| .walk(logDir, 1) |
| .map[Optional[SnapshotPath]](Snapshots.parse) |
| .filter(_.isPresent) |
| .forEach { path => |
| assertFalse(path.get.snapshotId.offset < log.startOffset) |
| } |
| } |
| |
| @Test |
| def testStartupWithInvalidSnapshotState(): Unit = { |
| // Initialize an empty log at offset 100. |
| var log = buildMetadataLog(tempDir, mockTime) |
| log.log.truncateFullyAndStartAt(newOffset = 100) |
| log.close() |
| |
| val metadataDir = metadataLogDir(tempDir) |
| assertTrue(metadataDir.exists()) |
| |
| // Initialization should fail unless we have a snapshot at an offset |
| // greater than or equal to 100. |
| assertThrows(classOf[IllegalStateException], () => { |
| buildMetadataLog(tempDir, mockTime) |
| }) |
| // Snapshots at offsets less than 100 are not sufficient. |
| writeEmptySnapshot(metadataDir, new OffsetAndEpoch(50, 1)) |
| assertThrows(classOf[IllegalStateException], () => { |
| buildMetadataLog(tempDir, mockTime) |
| }) |
| |
| // Snapshot at offset 100 should be fine. |
| writeEmptySnapshot(metadataDir, new OffsetAndEpoch(100, 1)) |
| log = buildMetadataLog(tempDir, mockTime) |
| log.log.truncateFullyAndStartAt(newOffset = 200) |
| log.close() |
| |
| // Snapshots at higher offsets are also fine. In this case, the |
| // log start offset should advance to the first snapshot offset. |
| writeEmptySnapshot(metadataDir, new OffsetAndEpoch(500, 1)) |
| log = buildMetadataLog(tempDir, mockTime) |
| assertEquals(500, log.log.logStartOffset) |
| } |
| |
| @Test |
| def testSnapshotDeletionWithInvalidSnapshotState(): Unit = { |
| // Initialize an empty log at offset 100. |
| val log = buildMetadataLog(tempDir, mockTime) |
| log.log.truncateFullyAndStartAt(newOffset = 100) |
| log.close() |
| |
| val metadataDir = metadataLogDir(tempDir) |
| assertTrue(metadataDir.exists()) |
| |
| // We have one deleted snapshot at an offset matching the start offset. |
| val snapshotId = new OffsetAndEpoch(100, 1) |
| writeEmptySnapshot(metadataDir, snapshotId) |
| |
| val deletedPath = Snapshots.markForDelete(metadataDir.toPath, snapshotId) |
| assertTrue(deletedPath.toFile.exists()) |
| |
| // Initialization should still fail. |
| assertThrows(classOf[IllegalStateException], () => { |
| buildMetadataLog(tempDir, mockTime) |
| }) |
| |
| // The snapshot marked for deletion should still exist. |
| assertTrue(deletedPath.toFile.exists()) |
| } |
| |
| private def metadataLogDir( |
| logDir: File |
| ): File = { |
| new File( |
| logDir.getAbsolutePath, |
| UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition) |
| ) |
| } |
| |
| private def writeEmptySnapshot( |
| metadataDir: File, |
| snapshotId: OffsetAndEpoch |
| ): Unit = { |
| val writer = FileRawSnapshotWriter.create( |
| metadataDir.toPath, |
| snapshotId, |
| Optional.empty() |
| ) |
| TestUtils.resource(writer)(_.freeze()) |
| } |
| |
| @Test |
| def testDoesntTruncateFully(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| val numberOfRecords = 10 |
| val epoch = 1 |
| |
| append(log, numberOfRecords, epoch) |
| |
| val olderEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch - 1) |
| TestUtils.resource(log.storeSnapshot(olderEpochSnapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertFalse(log.truncateToLatestSnapshot()) |
| |
| append(log, numberOfRecords, epoch) |
| |
| val olderOffsetSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch) |
| TestUtils.resource(log.storeSnapshot(olderOffsetSnapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| assertFalse(log.truncateToLatestSnapshot()) |
| } |
| |
| @Test |
| def testCleanupPartialSnapshots(): Unit = { |
| val (logDir, log, _) = buildMetadataLogAndDir(tempDir, mockTime) |
| val numberOfRecords = 10 |
| val epoch = 1 |
| val snapshotId = new OffsetAndEpoch(1, epoch) |
| |
| append(log, numberOfRecords, epoch) |
| TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| log.close() |
| |
| // Create a few partial snapshots |
| Snapshots.createTempFile(logDir, new OffsetAndEpoch(0, epoch - 1)) |
| Snapshots.createTempFile(logDir, new OffsetAndEpoch(1, epoch)) |
| Snapshots.createTempFile(logDir, new OffsetAndEpoch(2, epoch + 1)) |
| |
| val secondLog = buildMetadataLog(tempDir, mockTime) |
| |
| assertEquals(snapshotId, secondLog.latestSnapshotId().get) |
| assertEquals(0, log.startOffset) |
| assertEquals(epoch, log.lastFetchedEpoch) |
| assertEquals(numberOfRecords, log.endOffset().offset) |
| assertEquals(0, secondLog.highWatermark.offset) |
| |
| // Assert that the log dir doesn't contain any partial snapshots |
| Files |
| .walk(logDir, 1) |
| .map[Optional[SnapshotPath]](Snapshots.parse) |
| .filter(_.isPresent) |
| .forEach { path => |
| assertFalse(path.get.partial) |
| } |
| } |
| |
| @Test |
| def testCleanupOlderSnapshots(): Unit = { |
| val (logDir, log, config) = buildMetadataLogAndDir(tempDir, mockTime) |
| val numberOfRecords = 10 |
| val epoch = 1 |
| |
| append(log, 1, epoch - 1) |
| val oldSnapshotId1 = new OffsetAndEpoch(1, epoch - 1) |
| TestUtils.resource(log.storeSnapshot(oldSnapshotId1).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| append(log, 1, epoch) |
| val oldSnapshotId2 = new OffsetAndEpoch(2, epoch) |
| TestUtils.resource(log.storeSnapshot(oldSnapshotId2).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| append(log, numberOfRecords - 2, epoch) |
| val oldSnapshotId3 = new OffsetAndEpoch(numberOfRecords, epoch) |
| TestUtils.resource(log.storeSnapshot(oldSnapshotId3).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| val greaterSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch) |
| append(log, numberOfRecords, epoch) |
| TestUtils.resource(log.storeSnapshot(greaterSnapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| log.close() |
| |
| val secondLog = buildMetadataLog(tempDir, mockTime) |
| |
| assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get) |
| assertEquals(3 * numberOfRecords, secondLog.startOffset) |
| assertEquals(epoch, secondLog.lastFetchedEpoch) |
| mockTime.sleep(config.fileDeleteDelayMs) |
| |
| // Assert that the log dir doesn't contain any older snapshots |
| Files |
| .walk(logDir, 1) |
| .map[Optional[SnapshotPath]](Snapshots.parse) |
| .filter(_.isPresent) |
| .forEach { path => |
| assertFalse(path.get.snapshotId.offset < log.startOffset) |
| } |
| } |
| |
| @Test |
| def testCreateReplicatedLogTruncatesFully(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| val numberOfRecords = 10 |
| val epoch = 1 |
| val snapshotId = new OffsetAndEpoch(numberOfRecords + 1, epoch + 1) |
| |
| append(log, numberOfRecords, epoch) |
| TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| log.close() |
| |
| val secondLog = buildMetadataLog(tempDir, mockTime) |
| |
| assertEquals(snapshotId, secondLog.latestSnapshotId().get) |
| assertEquals(snapshotId.offset, secondLog.startOffset) |
| assertEquals(snapshotId.epoch, secondLog.lastFetchedEpoch) |
| assertEquals(snapshotId.offset, secondLog.endOffset().offset) |
| assertEquals(snapshotId.offset, secondLog.highWatermark.offset) |
| } |
| |
| @Test |
| def testMaxBatchSize(): Unit = { |
| val leaderEpoch = 5 |
| val maxBatchSizeInBytes = 16384 |
| val recordSize = 64 |
| val log = buildMetadataLog(tempDir, mockTime, DefaultMetadataLogConfig.copy(maxBatchSizeInBytes = maxBatchSizeInBytes)) |
| |
| val oversizeBatch = buildFullBatch(leaderEpoch, recordSize, maxBatchSizeInBytes + recordSize) |
| assertThrows(classOf[RecordTooLargeException], () => { |
| log.appendAsLeader(oversizeBatch, leaderEpoch) |
| }) |
| |
| val undersizeBatch = buildFullBatch(leaderEpoch, recordSize, maxBatchSizeInBytes) |
| val appendInfo = log.appendAsLeader(undersizeBatch, leaderEpoch) |
| assertEquals(0L, appendInfo.firstOffset) |
| } |
| |
| @Test |
| def testTruncateBelowHighWatermark(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| val numRecords = 10 |
| val epoch = 5 |
| |
| append(log, numRecords, epoch) |
| assertEquals(numRecords.toLong, log.endOffset.offset) |
| |
| log.updateHighWatermark(new LogOffsetMetadata(numRecords)) |
| assertEquals(numRecords.toLong, log.highWatermark.offset) |
| |
| assertThrows(classOf[IllegalArgumentException], () => log.truncateTo(5L)) |
| assertEquals(numRecords.toLong, log.highWatermark.offset) |
| } |
| |
| private def buildFullBatch( |
| leaderEpoch: Int, |
| recordSize: Int, |
| maxBatchSizeInBytes: Int |
| ): MemoryRecords = { |
| val buffer = ByteBuffer.allocate(maxBatchSizeInBytes) |
| val batchBuilder = new BatchBuilder[Array[Byte]]( |
| buffer, |
| new ByteArraySerde, |
| CompressionType.NONE, |
| 0L, |
| mockTime.milliseconds(), |
| false, |
| leaderEpoch, |
| maxBatchSizeInBytes |
| ) |
| |
| val serializationCache = new ObjectSerializationCache |
| val records = Collections.singletonList(new Array[Byte](recordSize)) |
| while (!batchBuilder.bytesNeeded(records, serializationCache).isPresent) { |
| batchBuilder.appendRecord(records.get(0), serializationCache) |
| } |
| |
| batchBuilder.build() |
| } |
| |
| @Test |
| def testValidateEpochGreaterThanLastKnownEpoch(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val numberOfRecords = 1 |
| val epoch = 1 |
| |
| append(log, numberOfRecords, epoch) |
| |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1) |
| assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) |
| assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val numberOfRecords = 10 |
| val epoch = 1 |
| |
| append(log, numberOfRecords, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) |
| |
| val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) |
| TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1) |
| assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) |
| assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateOffsetLessThanOldestSnapshotOffset(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val offset = 2 |
| val epoch = 1 |
| |
| append(log, offset, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(offset)) |
| |
| val snapshotId = new OffsetAndEpoch(offset, epoch) |
| TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| // Simulate log cleaning advancing the LSO |
| log.log.maybeIncrementLogStartOffset(offset, LogStartOffsetIncrementReason.SegmentDeletion); |
| |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset - 1, epoch) |
| assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) |
| assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateOffsetEqualToOldestSnapshotOffset(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val offset = 2 |
| val epoch = 1 |
| |
| append(log, offset, epoch) |
| log.updateHighWatermark(new LogOffsetMetadata(offset)) |
| |
| val snapshotId = new OffsetAndEpoch(offset, epoch) |
| TestUtils.resource(log.createNewSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch) |
| assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind) |
| assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): Unit = { |
| val offset = 10 |
| val numOfRecords = 5 |
| |
| val log = buildMetadataLog(tempDir, mockTime) |
| log.updateHighWatermark(new LogOffsetMetadata(offset)) |
| val snapshotId = new OffsetAndEpoch(offset, 1) |
| TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| log.truncateToLatestSnapshot() |
| |
| |
| append(log, numOfRecords, epoch = 1) |
| append(log, numOfRecords, epoch = 2) |
| append(log, numOfRecords, epoch = 4) |
| |
| // offset is not equal to oldest snapshot's offset |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3) |
| assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) |
| assertEquals(new OffsetAndEpoch(20, 2), resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateEpochLessThanFirstEpochInLog(): Unit = { |
| val offset = 10 |
| val numOfRecords = 5 |
| |
| val log = buildMetadataLog(tempDir, mockTime) |
| log.updateHighWatermark(new LogOffsetMetadata(offset)) |
| val snapshotId = new OffsetAndEpoch(offset, 1) |
| TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot => |
| snapshot.freeze() |
| } |
| log.truncateToLatestSnapshot() |
| |
| append(log, numOfRecords, epoch = 3) |
| |
| // offset is not equal to oldest snapshot's offset |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2) |
| assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) |
| assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateOffsetGreatThanEndOffset(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val numberOfRecords = 1 |
| val epoch = 1 |
| |
| append(log, numberOfRecords, epoch) |
| |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch) |
| assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) |
| assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateOffsetLessThanLEO(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val numberOfRecords = 10 |
| val epoch = 1 |
| |
| append(log, numberOfRecords, epoch) |
| append(log, numberOfRecords, epoch + 1) |
| |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(11, epoch) |
| assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) |
| assertEquals(new OffsetAndEpoch(10, epoch), resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testValidateValidEpochAndOffset(): Unit = { |
| val log = buildMetadataLog(tempDir, mockTime) |
| |
| val numberOfRecords = 5 |
| val epoch = 1 |
| |
| append(log, numberOfRecords, epoch) |
| |
| val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords - 1, epoch) |
| assertEquals(ValidOffsetAndEpoch.Kind.VALID, resultOffsetAndEpoch.kind) |
| assertEquals(new OffsetAndEpoch(numberOfRecords - 1, epoch), resultOffsetAndEpoch.offsetAndEpoch()) |
| } |
| |
| @Test |
| def testAdvanceLogStartOffsetAfterCleaning(): Unit = { |
| val config = MetadataLogConfig( |
| logSegmentBytes = 512, |
| logSegmentMinBytes = 512, |
| logSegmentMillis = 10 * 1000, |
| retentionMaxBytes = 256, |
| retentionMillis = 60 * 1000, |
| maxBatchSizeInBytes = 512, |
| maxFetchSizeInBytes = DefaultMetadataLogConfig.maxFetchSizeInBytes, |
| fileDeleteDelayMs = LogConfig.DEFAULT_FILE_DELETE_DELAY_MS, |
| nodeId = 1 |
| ) |
| config.copy() |
| val log = buildMetadataLog(tempDir, mockTime, config) |
| |
| // Generate some segments |
| for(_ <- 0 to 100) { |
| append(log, 47, 1) // An odd number of records to avoid offset alignment |
| } |
| assertFalse(log.maybeClean(), "Should not clean since HW was still 0") |
| |
| log.updateHighWatermark(new LogOffsetMetadata(4000)) |
| assertFalse(log.maybeClean(), "Should not clean since no snapshots exist") |
| |
| val snapshotId1 = new OffsetAndEpoch(1000, 1) |
| TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot => |
| append(snapshot, 100) |
| snapshot.freeze() |
| } |
| |
| val snapshotId2 = new OffsetAndEpoch(2000, 1) |
| TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot => |
| append(snapshot, 100) |
| snapshot.freeze() |
| } |
| |
| val lsoBefore = log.startOffset() |
| assertTrue(log.maybeClean(), "Expected to clean since there was at least one snapshot") |
| val lsoAfter = log.startOffset() |
| assertTrue(lsoAfter > lsoBefore, "Log Start Offset should have increased after cleaning") |
| assertTrue(lsoAfter == snapshotId2.offset, "Expected the Log Start Offset to be less than or equal to the snapshot offset") |
| } |
| |
| @Test |
| def testDeleteSnapshots(): Unit = { |
| // Generate some logs and a few snapshots, set retention low and verify that cleaning occurs |
| val config = DefaultMetadataLogConfig.copy( |
| logSegmentBytes = 1024, |
| logSegmentMinBytes = 1024, |
| logSegmentMillis = 10 * 1000, |
| retentionMaxBytes = 1024, |
| retentionMillis = 60 * 1000, |
| maxBatchSizeInBytes = 100 |
| ) |
| val log = buildMetadataLog(tempDir, mockTime, config) |
| |
| for(_ <- 0 to 1000) { |
| append(log, 1, 1) |
| } |
| log.updateHighWatermark(new LogOffsetMetadata(1001)) |
| |
| for(offset <- Seq(100, 200, 300, 400, 500, 600)) { |
| val snapshotId = new OffsetAndEpoch(offset, 1) |
| TestUtils.resource(log.storeSnapshot(snapshotId).get()) { snapshot => |
| append(snapshot, 10) |
| snapshot.freeze() |
| } |
| } |
| |
| assertEquals(6, log.snapshotCount()) |
| assertTrue(log.maybeClean()) |
| assertEquals(1, log.snapshotCount(), "Expected only one snapshot after cleaning") |
| assertOptional(log.latestSnapshotId(), (snapshotId: OffsetAndEpoch) => { |
| assertEquals(600, snapshotId.offset) |
| }) |
| assertEquals(log.startOffset, 600) |
| } |
| |
| @Test |
| def testSoftRetentionLimit(): Unit = { |
| // Set retention equal to the segment size and generate slightly more than one segment of logs |
| val config = DefaultMetadataLogConfig.copy( |
| logSegmentBytes = 10240, |
| logSegmentMinBytes = 10240, |
| logSegmentMillis = 10 * 1000, |
| retentionMaxBytes = 10240, |
| retentionMillis = 60 * 1000, |
| maxBatchSizeInBytes = 100 |
| ) |
| val log = buildMetadataLog(tempDir, mockTime, config) |
| |
| for(_ <- 0 to 2000) { |
| append(log, 1, 1) |
| } |
| log.updateHighWatermark(new LogOffsetMetadata(2000)) |
| |
| // Then generate two snapshots |
| val snapshotId1 = new OffsetAndEpoch(1000, 1) |
| TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot => |
| append(snapshot, 500) |
| snapshot.freeze() |
| } |
| |
| // Then generate a snapshot |
| val snapshotId2 = new OffsetAndEpoch(2000, 1) |
| TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot => |
| append(snapshot, 500) |
| snapshot.freeze() |
| } |
| |
| // Cleaning should occur, but resulting size will not be under retention limit since we have to keep one snapshot |
| assertTrue(log.maybeClean()) |
| assertEquals(1, log.snapshotCount(), "Expected one snapshot after cleaning") |
| assertOptional(log.latestSnapshotId(), (snapshotId: OffsetAndEpoch) => { |
| assertEquals(2000, snapshotId.offset, "Unexpected offset for latest snapshot") |
| assertOptional(log.readSnapshot(snapshotId), (reader: RawSnapshotReader) => { |
| assertTrue(reader.sizeInBytes() + log.log.size > config.retentionMaxBytes) |
| }) |
| }) |
| } |
| |
| @Test |
| def testSegmentsLessThanLatestSnapshot(): Unit = { |
| val config = DefaultMetadataLogConfig.copy( |
| logSegmentBytes = 10240, |
| logSegmentMinBytes = 10240, |
| logSegmentMillis = 10 * 1000, |
| retentionMaxBytes = 10240, |
| retentionMillis = 60 * 1000, |
| maxBatchSizeInBytes = 200 |
| ) |
| val log = buildMetadataLog(tempDir, mockTime, config) |
| |
| // Generate enough data to cause a segment roll |
| for (_ <- 0 to 2000) { |
| append(log, 10, 1) |
| } |
| log.updateHighWatermark(new LogOffsetMetadata(log.endOffset.offset)) |
| |
| // The clean up code requires that there are at least two snapshots |
| // Generate first snapshots that includes the first segment by using the base offset of the second segment |
| val snapshotId1 = new OffsetAndEpoch( |
| log.log.logSegments.drop(1).head.baseOffset, |
| 1 |
| ) |
| TestUtils.resource(log.storeSnapshot(snapshotId1).get()) { snapshot => |
| snapshot.freeze() |
| } |
| // Generate second snapshots that includes the second segment by using the base offset of the third segment |
| val snapshotId2 = new OffsetAndEpoch( |
| log.log.logSegments.drop(2).head.baseOffset, |
| 1 |
| ) |
| TestUtils.resource(log.storeSnapshot(snapshotId2).get()) { snapshot => |
| snapshot.freeze() |
| } |
| |
| // Sleep long enough to trigger a possible segment delete because of the default retention |
| val defaultLogRetentionMs = LogConfig.DEFAULT_RETENTION_MS * 2 |
| mockTime.sleep(defaultLogRetentionMs) |
| |
| assertTrue(log.maybeClean()) |
| assertEquals(1, log.snapshotCount()) |
| assertTrue(log.startOffset > 0, s"${log.startOffset} must be greater than 0") |
| val latestSnapshotOffset = log.latestSnapshotId().get.offset |
| assertTrue( |
| latestSnapshotOffset >= log.startOffset, |
| s"latest snapshot offset ($latestSnapshotOffset) must be >= log start offset (${log.startOffset})" |
| ) |
| } |
| } |
| |
| object KafkaMetadataLogTest { |
| class ByteArraySerde extends RecordSerde[Array[Byte]] { |
| override def recordSize(data: Array[Byte], serializationCache: ObjectSerializationCache): Int = { |
| data.length |
| } |
| override def write(data: Array[Byte], serializationCache: ObjectSerializationCache, out: Writable): Unit = { |
| out.writeByteArray(data) |
| } |
| override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size) |
| } |
| |
| val DefaultMetadataLogConfig = MetadataLogConfig( |
| logSegmentBytes = 100 * 1024, |
| logSegmentMinBytes = 100 * 1024, |
| logSegmentMillis = 10 * 1000, |
| retentionMaxBytes = 100 * 1024, |
| retentionMillis = 60 * 1000, |
| maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES, |
| maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES, |
| fileDeleteDelayMs = LogConfig.DEFAULT_FILE_DELETE_DELAY_MS, |
| nodeId = 1 |
| ) |
| |
| def buildMetadataLogAndDir( |
| tempDir: File, |
| time: MockTime, |
| metadataLogConfig: MetadataLogConfig = DefaultMetadataLogConfig |
| ): (Path, KafkaMetadataLog, MetadataLogConfig) = { |
| |
| val logDir = createLogDirectory( |
| tempDir, |
| UnifiedLog.logDirName(KafkaRaftServer.MetadataPartition) |
| ) |
| |
| val metadataLog = KafkaMetadataLog( |
| KafkaRaftServer.MetadataPartition, |
| KafkaRaftServer.MetadataTopicId, |
| logDir, |
| time, |
| time.scheduler, |
| metadataLogConfig |
| ) |
| |
| (logDir.toPath, metadataLog, metadataLogConfig) |
| } |
| |
| def buildMetadataLog( |
| tempDir: File, |
| time: MockTime, |
| metadataLogConfig: MetadataLogConfig = DefaultMetadataLogConfig, |
| ): KafkaMetadataLog = { |
| val (_, log, _) = buildMetadataLogAndDir(tempDir, time, metadataLogConfig) |
| log |
| } |
| |
| def append(log: ReplicatedLog, numberOfRecords: Int, epoch: Int): LogAppendInfo = { |
| log.appendAsLeader( |
| MemoryRecords.withRecords( |
| log.endOffset().offset, |
| CompressionType.NONE, |
| epoch, |
| (0 until numberOfRecords).map(number => new SimpleRecord(number.toString.getBytes)): _* |
| ), |
| epoch |
| ) |
| } |
| |
| def append(snapshotWriter: RawSnapshotWriter, numberOfRecords: Int): Unit = { |
| snapshotWriter.append(MemoryRecords.withRecords( |
| 0, |
| CompressionType.NONE, |
| 0, |
| (0 until numberOfRecords).map(number => new SimpleRecord(number.toString.getBytes)): _* |
| )) |
| } |
| |
| private def createLogDirectory(logDir: File, logDirName: String): File = { |
| val logDirPath = logDir.getAbsolutePath |
| val dir = new File(logDirPath, logDirName) |
| if (!Files.exists(dir.toPath)) { |
| Files.createDirectories(dir.toPath) |
| } |
| dir |
| } |
| } |