package kafka.log
import kafka.common._
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
import{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockConstruction, times, verify, verifyNoMoreInteractions}
import{File, RandomAccessFile}
import java.nio._
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection._
import scala.jdk.CollectionConverters._
* Unit tests for the log cleaning logic
class LogCleanerTest {
val tmpdir = TestUtils.tempDir()
val dir = TestUtils.randomPartitionLogDir(tmpdir)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString)
val logConfig = new LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
def teardown(): Unit = {
def testRemoveMetricsOnClose(): Unit = {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
// shutdown logCleaner so that metrics are removed
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(0)
val numMetricsRegistered = LogCleaner.MetricNames.size
verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any())
// verify that each metric is removed
// assert that we have verified all invocations on
} finally {
* Test simple log cleaning
def testCleanSegments(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
val keysFound = LogTestUtils.keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
// pretend we have the following keys
val keys = immutable.ListSet(1L, 3L, 5L, 7L, 9L)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
// clean the log
val segments = log.logSegments.take(3).toSeq
val stats = new CleanerStats()
val expectedBytesRead =
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata, -1)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead)
def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = {
val deleteStartLatch = new CountDownLatch(1)
val deleteCompleteLatch = new CountDownLatch(1)
// Construct a log instance. The replaceSegments() method of the log instance is overridden so that
// it waits for another thread to execute deleteOldSegments()
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024 : java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs
val logSegments = new LogSegments(topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val offsets = new LogLoader(
hadCleanShutdown = true,
val localLog = new LocalLog(dir, config, logSegments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, time.scheduler, time, topicPartition, logDirFailureChannel)
val log = new UnifiedLog(offsets.logStartOffset,
brokerTopicStats = new BrokerTopicStats,
producerIdExpirationCheckIntervalMs = producerIdExpirationCheckIntervalMs,
leaderEpochCache = leaderEpochCache,
producerStateManager = producerStateManager,
_topicId = None,
keepPartitionMetadataFile = true) {
override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment]): Unit = {
if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Log segment deletion timed out")
super.replaceSegments(newSegments, oldSegments)
// Start a thread which execute log.deleteOldSegments() right before replaceSegments() is executed
val t = new Thread() {
override def run(): Unit = {
deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset, LogStartOffsetIncrementReason.LeaderOffsetIncremented)
// Append records so that segment number increase to 3
while (log.numberOfSegments < 3) {
log.appendAsLeader(record(key = 0, log.logEndOffset.toInt), leaderEpoch = 0)
assertEquals(3, log.numberOfSegments)
// Remember reference to the first log and determine its file name expected for async deletion
val firstLogFile = log.logSegments.head.log
val expectedFileName = Utils.replaceSuffix(firstLogFile.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX)
// Clean the log. This should trigger replaceSegments() and deleteOldSegments();
val offsetMap = new FakeOffsetMap(Int.MaxValue)
val cleaner = makeCleaner(Int.MaxValue)
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata, -1)
// Validate based on the file name that log segment file is renamed exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath)
assertEquals(2, log.numberOfSegments)
def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = {
val originalMaxFileSize = 1024;
val cleaner = makeCleaner(2)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, originalMaxFileSize: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact": java.lang.String)
logProps.put(TopicConfig.PREALLOCATE_CONFIG, "true": java.lang.String)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 0
log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 1
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 2
log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 3
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 4
// roll the segment, so we can clean the messages already appended
// clean the log with only one message removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
assertTrue( < originalMaxFileSize,
"Cleaned segment file should be trimmed to its real size.")
def testDuplicateCheckAfterCleaning(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
var log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val pid1 = 1
val pid2 = 2
val pid3 = 3
val pid4 = 4
appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
assertEquals(List(2, 5, 7), lastOffsetsPerBatchInLog(log))
assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1), lastSequencesInLog(log))
assertEquals(List(2, 3, 1, 4), LogTestUtils.keysInLog(log))
assertEquals(List(1, 3, 6, 7), offsetsInLog(log))
// we have to reload the log to validate that the cleaner maintained sequence numbers correctly
def reloadLog(): Unit = {
log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps), recoveryPoint = 0L)
// check duplicate append from producer 1
var logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(2L, logAppendInfo.lastOffset)
// check duplicate append from producer 3
logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
assertEquals(6L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(7L, logAppendInfo.lastOffset)
// check duplicate append from producer 2
logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
assertEquals(3L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(5L, logAppendInfo.lastOffset)
// do one more append and a round of cleaning to force another deletion from producer 1's batch
appendIdempotentAsLeader(log, pid4, producerEpoch)(Seq(2))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1, pid4 -> 0), lastSequencesInLog(log))
assertEquals(List(2, 5, 7, 8), lastOffsetsPerBatchInLog(log))
assertEquals(List(3, 1, 4, 2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 6, 7, 8), offsetsInLog(log))
// duplicate append from producer1 should still be fine
logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
assertEquals(2L, logAppendInfo.lastOffset)
private def assertAllAbortedTxns(
expectedAbortedTxns: List[AbortedTxn],
log: UnifiedLog
): Unit= {
val abortedTxns = log.collectAbortedTransactions(startOffset = 0L, upperBoundOffset = log.logEndOffset)
assertEquals(expectedAbortedTxns, abortedTxns)
private def assertAllTransactionsComplete(log: UnifiedLog): Unit = {
assertTrue(log.activeProducers.forall(_.currentTxnStartOffset() == -1))
def testMultiPassSegmentCleaningWithAbortedTransactions(): Unit = {
// Verify that the log cleaner preserves aborted transaction state (including the index)
// even if the cleaner cannot clean the whole segment in one pass.
val deleteRetentionMs = 50000
val offsetMapSlots = 4
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, deleteRetentionMs.toString)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId1 = 1
val producerId2 = 2
val appendProducer1 = appendTransactionalAsLeader(log, producerId1, producerEpoch)
val appendProducer2 = appendTransactionalAsLeader(log, producerId2, producerEpoch)
def abort(producerId: Long): Unit = {
log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.REPLICATION)
def commit(producerId: Long): Unit = {
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.REPLICATION)
// Append some transaction data (offset range in parenthesis)
appendProducer1(Seq(1, 2)) // [0, 1]
appendProducer2(Seq(2, 3)) // [2, 3]
appendProducer1(Seq(3, 4)) // [4, 5]
commit(producerId1) // [6, 6]
commit(producerId2) // [7, 7]
appendProducer1(Seq(2, 3)) // [8, 9]
abort(producerId1) // [10, 10]
appendProducer2(Seq(4, 5)) // [11, 12]
appendProducer1(Seq(5, 6)) // [13, 14]
commit(producerId1) // [15, 15]
abort(producerId2) // [16, 16]
appendProducer2(Seq(6, 7)) // [17, 18]
commit(producerId2) // [19, 19]
assertEquals(20L, log.logEndOffset)
val expectedAbortedTxns = List(
new AbortedTxn(producerId1, 8, 10, 11),
new AbortedTxn(producerId2, 11, 16, 17)
assertAllAbortedTxns(expectedAbortedTxns, log)
var dirtyOffset = 0L
def cleanSegments(): Unit = {
val offsetMap = new FakeOffsetMap(slots = offsetMapSlots)
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats(time)
cleaner.buildOffsetMap(log, dirtyOffset, log.activeSegment.baseOffset, offsetMap, stats)
cleaner.cleanSegments(log, segments, offsetMap, time.milliseconds(), stats, new CleanedTransactionMetadata, Long.MaxValue)
dirtyOffset = offsetMap.latestOffset + 1
// On the first pass, we should see the data from the aborted transactions deleted,
// but the markers should remain until the deletion retention time has passed.
assertEquals(4L, dirtyOffset)
assertEquals(List(0, 2, 4, 6, 7, 10, 13, 15, 16, 17, 19), batchBaseOffsetsInLog(log))
assertEquals(List(0, 2, 3, 4, 5, 6, 7, 10, 13, 14, 15, 16, 17, 18, 19), offsetsInLog(log))
assertAllAbortedTxns(expectedAbortedTxns, log)
// On the second pass, no data from the aborted transactions remains. The markers
// still cannot be removed from the log due to the retention time, but we do not
// need to record them in the transaction index since they are empty.
assertEquals(14, dirtyOffset)
assertEquals(List(0, 2, 4, 6, 7, 10, 13, 15, 16, 17, 19), batchBaseOffsetsInLog(log))
assertEquals(List(0, 2, 4, 5, 6, 7, 10, 13, 14, 15, 16, 17, 18, 19), offsetsInLog(log))
assertAllAbortedTxns(List(), log)
// On the last pass, wait for the retention time to expire. The abort markers
// (offsets 10 and 16) should be deleted.
assertEquals(20L, dirtyOffset)
assertEquals(List(0, 2, 4, 6, 7, 13, 15, 17, 19), batchBaseOffsetsInLog(log))
assertEquals(List(0, 2, 4, 5, 6, 7, 13, 15, 17, 18, 19), offsetsInLog(log))
assertAllAbortedTxns(List(), log)
def testBasicTransactionAwareCleaning(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val pid1 = 1
val pid2 = 2
val appendProducer1 = appendTransactionalAsLeader(log, pid1, producerEpoch)
val appendProducer2 = appendTransactionalAsLeader(log, pid2, producerEpoch)
appendProducer1(Seq(1, 2))
appendProducer2(Seq(2, 3))
appendProducer1(Seq(3, 4))
log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
log.appendAsLeader(commitMarker(pid2, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
log.appendAsLeader(commitMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
val abortedTransactions = log.collectAbortedTransactions(log.logStartOffset, log.logEndOffset)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
assertEquals(List(3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 6, 7, 8, 9), offsetsInLog(log))
// ensure the transaction index is still correct
assertEquals(abortedTransactions, log.collectAbortedTransactions(log.logStartOffset, log.logEndOffset))
def testCleanWithTransactionsSpanningSegments(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val pid1 = 1
val pid2 = 2
val pid3 = 3
val appendProducer1 = appendTransactionalAsLeader(log, pid1, producerEpoch)
val appendProducer2 = appendTransactionalAsLeader(log, pid2, producerEpoch)
val appendProducer3 = appendTransactionalAsLeader(log, pid3, producerEpoch)
appendProducer1(Seq(1, 2))
appendProducer3(Seq(2, 3))
appendProducer2(Seq(3, 4))
appendProducer2(Seq(5, 6))
appendProducer3(Seq(6, 7))
appendProducer1(Seq(7, 8))
log.appendAsLeader(abortMarker(pid2, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
appendProducer3(Seq(8, 9))
log.appendAsLeader(commitMarker(pid3, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
appendProducer1(Seq(9, 10))
log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
// we have only cleaned the records in the first segment
val dirtyOffset = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))._1
assertEquals(List(2, 3, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10), LogTestUtils.keysInLog(log))
// append a couple extra segments in the new segment to ensure we have sequence numbers
// finally only the keys from pid3 should remain
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, dirtyOffset, log.activeSegment.baseOffset))
assertEquals(List(2, 3, 6, 7, 8, 9, 11, 12), LogTestUtils.keysInLog(log))
def testCommitMarkerRemoval(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
appendProducer(Seq(2, 3))
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
// cannot remove the marker in this pass because there are still valid records
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(1, 3, 2), LogTestUtils.keysInLog(log))
assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
appendProducer(Seq(1, 3))
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
// the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// clean again with same timestamp to verify marker is not removed early
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
// clean again with max timestamp to verify the marker is removed
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 1, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
* Tests log cleaning with batches that are deleted where no additional messages
* are available to read in the buffer. Cleaning should continue from the next offset.
def testDeletedBatchesWithNoMessagesRead(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(capacity = Int.MaxValue, maxMessageSize = 100)
val logProps = new Properties()
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 100: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1000: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(1, 3, 4), offsetsInLog(log))
// In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producer1 = appendTransactionalAsLeader(log, 1L, producerEpoch)
val producer2 = appendTransactionalAsLeader(log, 2L, producerEpoch)
// [{Producer1: 2, 3}]
producer1(Seq(2, 3)) // offsets 0, 1
// [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}]
producer2(Seq(2, 3)) // offsets 2, 3
log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0,
origin = AppendOrigin.COORDINATOR) // offset 4
// [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}], [{2}, {3}, {Producer1: Commit}]
// {0, 1}, {2, 3}, {4}, {5}, {6}, {7} ==> Offsets
log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 5
log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 6
log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0,
origin = AppendOrigin.COORDINATOR) // offset 7
// first time through the records are removed
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the last sequence
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// append a new record from the producer to allow cleaning of the empty batch
// [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
// {1}, {3}, {4}, {5}, {6}, {7}, {8}, {9} ==> Offsets
producer2(Seq(1)) // offset 8
log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0,
origin = AppendOrigin.COORDINATOR) // offset 9
// Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
// The deleteHorizon for {Producer2: Commit} is still not set yet.
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
// Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
// In the first pass, the deleteHorizon for {Producer2: Commit} is set. In the second pass, it's removed.
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2, 3, 1), LogTestUtils.keysInLog(log))
assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
def testCleanEmptyControlBatch(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
// [{Producer1: Commit}, {2}, {3}]
log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0,
origin = AppendOrigin.COORDINATOR) // offset 1
log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
// first time through the control batch is retained as an empty batch
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
// In the first pass, the deleteHorizon for the commit marker is set. In the second pass, the commit marker is removed
// but the empty batch is retained for preserving the producer epoch.
var dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
// the empty control batch does not cause an exception when cleaned
// Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = Long.MaxValue)._1
assertEquals(List(2, 3), LogTestUtils.keysInLog(log))
assertEquals(List(1, 2), offsetsInLog(log))
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
def testCommittedTransactionSpanningSegments(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
// Both the record and the marker should remain after cleaning
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(0, 1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
def testAbortedTransactionSpanningSegments(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 128: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
// Both the batch and the marker should remain after cleaning. The batch is retained
// because it is the last entry for this producerId. The marker is retained because
// there are still batches remaining from this transaction.
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
// The empty batch and the marker is still retained after a second cleaning.
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
def testAbortMarkerRemoval(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
appendProducer(Seq(2, 3))
log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
// Aborted records are removed, but the abort marker is still preserved.
val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
// In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(3), LogTestUtils.keysInLog(log))
assertEquals(List(4, 5), offsetsInLog(log))
def testEmptyBatchRemovalWithSequenceReuse(): Unit = {
// The group coordinator always writes batches beginning with sequence number 0. This test
// ensures that we still remove old empty batches and transaction markers under this expectation.
val producerEpoch = 0.toShort
val producerId = 1L
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 2048: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val appendFirstTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
origin = AppendOrigin.REPLICATION)
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
val appendSecondTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch,
origin = AppendOrigin.REPLICATION)
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
log.appendAsLeader(record(1, 1), leaderEpoch = 0)
log.appendAsLeader(record(2, 1), leaderEpoch = 0)
// Roll the log to ensure that the data is cleanable.
// Both transactional batches will be cleaned. The last one will remain in the log
// as an empty batch in order to preserve the producer sequence number and epoch
cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1, 3, 4, 5), offsetsInLog(log))
assertEquals(List(1, 2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
// In the first pass, the delete horizon for the first marker is set. In the second pass, the first marker is removed.
runTwoPassClean(cleaner, LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(3, 4, 5), offsetsInLog(log))
assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log))
def testAbortMarkerRetentionWithEmptyBatch(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 256: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
appendProducer(Seq(2, 3)) // batch last offset is 1
log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
def assertAbortedTransactionIndexed(): Unit = {
val abortedTxns = log.collectAbortedTransactions(0L, 100L)
assertEquals(1, abortedTxns.size)
assertEquals(producerId, abortedTxns.head.producerId)
assertEquals(0, abortedTxns.head.firstOffset)
assertEquals(2, abortedTxns.head.lastOffset)
// first time through the records are removed
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained
// the empty batch remains if cleaned again because it still holds the last sequence
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained
// now update the last sequence so that the empty batch can be removed
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)._1
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we read the empty batch
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the empty batch
// In the first pass, the delete horizon for the abort marker is set. In the second pass, the abort marker is removed.
dirtyOffset = runTwoPassClean(cleaner, LogToClean(tp, log, dirtyOffset, log.activeSegment.baseOffset), currentTime = largeTimestamp)
assertEquals(List(1), LogTestUtils.keysInLog(log))
assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
assertEquals(List(3), lastOffsetsPerBatchInLog(log))
// we do not bother retaining the aborted transaction in the index
assertEquals(0, log.collectAbortedTransactions(0L, 100L).size)
* Test log cleaning with logs containing messages larger than default message size
def testLargeMessage(): Unit = {
val largeMessageSize = 1024 * 1024
// Create cleaner with very small default max message size
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer)
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, largeMessageSize * 2: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while(log.numberOfSegments < 2)
log.appendAsLeader(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
val keysFound = LogTestUtils.keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
// pretend we have the following keys
val keys = immutable.ListSet(1L, 3L, 5L, 7L, 9L)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
// clean the log
val stats = new CleanerStats()
cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata, -1)
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
* Test log cleaning with logs containing messages larger than topic's max message size
def testMessageLargerThanMaxMessageSize(): Unit = {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k => !
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
* Test log cleaning with logs containing messages larger than topic's max message size
* where header is corrupt
def testMessageLargerThanMaxMessageSizeWithCorruptHeader(): Unit = {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
* Test log cleaning with logs containing messages larger than topic's max message size
* where message size is corrupt and larger than bytes available in log segment.
def testCorruptMessageSizeLargerThanBytesAvailable(): Unit = {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val file = new RandomAccessFile(log.logSegments.head.log.file, "rw")
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
def createLogWithMessagesLargerThanMaxSize(largeMessageSize: Int): (UnifiedLog, FakeOffsetMap) = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, largeMessageSize * 16: java.lang.Integer)
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, largeMessageSize * 2: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
while(log.numberOfSegments < 2)
log.appendAsLeader(record(log.logEndOffset.toInt, Array.fill(largeMessageSize)(0: Byte)), leaderEpoch = 0)
val keysFound = LogTestUtils.keysInLog(log)
assertEquals(0L until log.logEndOffset, keysFound)
// Decrease the log's max message size
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, largeMessageSize / 2: java.lang.Integer)
log.updateConfig(LogConfig.fromProps(logConfig.originals, logProps))
// pretend we have the following keys
val keys = immutable.ListSet(1, 3, 5, 7, 9)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
(log, map)
def testCleaningWithDeletes(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages with the keys 0 through N
while(log.numberOfSegments < 2)
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
// delete all even keys between 0 and N
val leo = log.logEndOffset
for(key <- 0 until leo.toInt by 2)
log.appendAsLeader(tombstoneRecord(key), leaderEpoch = 0)
// append some new unique keys to pad out to a new active segment
while(log.numberOfSegments < 4)
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
val keys = LogTestUtils.keysInLog(log).toSet
assertTrue((0 until leo.toInt by 2).forall(!keys.contains(_)), "None of the keys we deleted should still exist.")
def testLogCleanerStats(): Unit = {
// because loadFactor is 0.75, this means we can fit 3 messages in the map
val cleaner = makeCleaner(4)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 0
log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 1
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 2
log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 3
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 4
// roll the segment, so we can clean the messages already appended
val initialLogSize = log.size
val (endOffset, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
assertEquals(5, endOffset)
assertEquals(5, stats.messagesRead)
assertEquals(initialLogSize, stats.bytesRead)
assertEquals(2, stats.messagesWritten)
assertEquals(log.size, stats.bytesWritten)
assertEquals(0, stats.invalidMessagesRead)
assertTrue(stats.endTime >= stats.startTime)
def testLogCleanerRetainsProducerLastSequence(): Unit = {
val cleaner = makeCleaner(10)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
log.appendAsLeader(record(0, 0), leaderEpoch = 0) // offset 0
log.appendAsLeader(record(0, 1, producerId = 1, producerEpoch = 0, sequence = 0), leaderEpoch = 0) // offset 1
log.appendAsLeader(record(0, 2, producerId = 2, producerEpoch = 0, sequence = 0), leaderEpoch = 0) // offset 2
log.appendAsLeader(record(0, 3, producerId = 3, producerEpoch = 0, sequence = 0), leaderEpoch = 0) // offset 3
log.appendAsLeader(record(1, 1, producerId = 2, producerEpoch = 0, sequence = 1), leaderEpoch = 0) // offset 4
// roll the segment, so we can clean the messages already appended
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
assertEquals(List(1, 3, 4), lastOffsetsPerBatchInLog(log))
assertEquals(Map(1L -> 0, 2L -> 1, 3L -> 0), lastSequencesInLog(log))
assertEquals(List(0, 1), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4), offsetsInLog(log))
def testLogCleanerRetainsLastSequenceEvenIfTransactionAborted(): Unit = {
val cleaner = makeCleaner(10)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
appendProducer(Seq(2, 3))
log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, origin = AppendOrigin.COORDINATOR)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log))
assertEquals(Map(producerId -> 2), lastSequencesInLog(log))
assertEquals(List(), LogTestUtils.keysInLog(log))
assertEquals(List(3), offsetsInLog(log))
// Append a new entry from the producer and verify that the empty batch is cleaned up
appendProducer(Seq(1, 5))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
assertEquals(List(3, 5), lastOffsetsPerBatchInLog(log))
assertEquals(Map(producerId -> 4), lastSequencesInLog(log))
assertEquals(List(1, 5), LogTestUtils.keysInLog(log))
assertEquals(List(3, 4, 5), offsetsInLog(log))
def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = {
val cleaner = makeCleaner(10)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val leaderEpoch = 5
val producerEpoch = 0.toShort
// First we append one committed transaction
val producerId1 = 1L
val appendProducer = appendTransactionalAsLeader(log, producerId1, producerEpoch, leaderEpoch)
log.appendAsLeader(commitMarker(producerId1, producerEpoch), leaderEpoch, origin = AppendOrigin.COORDINATOR)
// Now we append one transaction with a key which conflicts with the COMMIT marker appended above
def commitRecordKey(): ByteBuffer = {
val keySize = ControlRecordType.COMMIT.recordKey().sizeOf()
val key = ByteBuffer.allocate(keySize)
val producerId2 = 2L
val records = MemoryRecords.withTransactionalRecords(
new SimpleRecord(time.milliseconds(), commitRecordKey(), ByteBuffer.wrap("foo".getBytes))
log.appendAsLeader(records, leaderEpoch, origin = AppendOrigin.CLIENT)
log.appendAsLeader(commitMarker(producerId2, producerEpoch), leaderEpoch, origin = AppendOrigin.COORDINATOR)
assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
// After cleaning, the marker should not be removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
assertEquals(List(0, 1, 2, 3), lastOffsetsPerBatchInLog(log))
assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
def testPartialSegmentClean(): Unit = {
// because loadFactor is 0.75, this means we can fit 1 message in the map
val cleaner = makeCleaner(2)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 0
log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 1
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 2
log.appendAsLeader(record(1,1), leaderEpoch = 0) // offset 3
log.appendAsLeader(record(0,0), leaderEpoch = 0) // offset 4
// roll the segment, so we can clean the messages already appended
// clean the log with only one message removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
assertEquals(List(1,0,1,0), LogTestUtils.keysInLog(log))
assertEquals(List(1,2,3,4), offsetsInLog(log))
// continue to make progress, even though we can only clean one message at a time
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 3, log.activeSegment.baseOffset))
assertEquals(List(0,1,0), LogTestUtils.keysInLog(log))
assertEquals(List(2,3,4), offsetsInLog(log))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 4, log.activeSegment.baseOffset))
assertEquals(List(1,0), LogTestUtils.keysInLog(log))
assertEquals(List(3,4), offsetsInLog(log))
def testCleaningWithUncleanableSection(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// Number of distinct keys. For an effective test this should be small enough such that each log segment contains some duplicates.
val N = 10
val numCleanableSegments = 2
val numTotalSegments = 7
// append messages with the keys 0 through N-1, values equal offset
while(log.numberOfSegments <= numCleanableSegments)
log.appendAsLeader(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt), leaderEpoch = 0)
// at this point one message past the cleanable segments has been added
// the entire segment containing the first uncleanable offset should not be cleaned.
val firstUncleanableOffset = log.logEndOffset + 1 // +1 so it is past the baseOffset
while(log.numberOfSegments < numTotalSegments - 1)
log.appendAsLeader(record(log.logEndOffset.toInt % N, log.logEndOffset.toInt), leaderEpoch = 0)
// the last (active) segment has just one message
def distinctValuesBySegment = => => TestUtils.readString(record.value)).toSet.size).toSeq
val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
"Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.")
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset))
val distinctValuesBySegmentAfterClean = distinctValuesBySegment
.take(numCleanableSegments).forall { case (before, after) => after < before },
"The cleanable segments should have fewer number of values after cleaning")
.slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == x._2 }, "The uncleanable segments should have the same number of values after cleaning")
def testLogToClean(): Unit = {
// create a log with small segment size
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 100: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
def createRecorcs = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
log.appendAsLeader(createRecorcs, leaderEpoch = 0)
val logToClean = LogToClean(new TopicPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
assertEquals(logToClean.totalBytes, log.size - log.activeSegment.size,
"Total bytes of LogToClean should equal size of all segments excluding the active segment")
def testLogToCleanWithUncleanableSection(): Unit = {
// create a log with small segment size
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 100: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// create 6 segments with only one message in each segment
def createRecords = TestUtils.singletonRecords(value = Array.fill[Byte](25)(0), key = 1.toString.getBytes)
for (_ <- 0 until 6)
log.appendAsLeader(createRecords, leaderEpoch = 0)
// segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
val segs = log.logSegments.toSeq
val logToClean = LogToClean(new TopicPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset)
val expectedCleanSize = segs.take(2).map(_.size).sum
val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
assertEquals(logToClean.cleanBytes, expectedCleanSize,
"Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty")
assertEquals(logToClean.cleanableBytes, expectedCleanableSize,
"Cleanable bytes of LogToClean should equal size of all segments from the one containing first dirty offset" +
" to the segment prior to the one with the first uncleanable offset")
assertEquals(logToClean.totalBytes, expectedCleanSize + expectedCleanableSize,
"Total bytes should be the sum of the clean and cleanable segments")
expectedCleanableSize / (expectedCleanSize + expectedCleanableSize).toDouble, 1.0e-6d,
"Total cleanable ratio should be the ratio of cleanable size to clean plus cleanable")
def testCleaningWithUnkeyedMessages(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
// create a log with compaction turned off so we can append unkeyed messages
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append unkeyed messages
while(log.numberOfSegments < 2)
log.appendAsLeader(unkeyedRecord(log.logEndOffset.toInt), leaderEpoch = 0)
val numInvalidMessages = unkeyedMessageCountInLog(log)
val sizeWithUnkeyedMessages = log.size
// append keyed messages
while(log.numberOfSegments < 3)
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
val (_, stats) = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
assertEquals(0, unkeyedMessageCountInLog(log), "Log should only contain keyed messages after cleaning.")
assertEquals(expectedSizeAfterCleaning, log.size, "Log should only contain keyed messages after cleaning.")
assertEquals(numInvalidMessages, stats.invalidMessagesRead, "Cleaner should have seen %d invalid messages.")
private def batchBaseOffsetsInLog(log: UnifiedLog): Iterable[Long] = {
for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
yield batch.baseOffset
def lastOffsetsPerBatchInLog(log: UnifiedLog): Iterable[Long] = {
for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
yield batch.lastOffset
def lastSequencesInLog(log: UnifiedLog): Map[Long, Int] = {
(for (segment <- log.logSegments;
batch <- segment.log.batches.asScala if !batch.isControlBatch && batch.hasProducerId)
yield batch.producerId -> batch.lastSequence).toMap
/* extract all the offsets from a log */
def offsetsInLog(log: UnifiedLog): Iterable[Long] =
log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(m => m.offset))
def unkeyedMessageCountInLog(log: UnifiedLog) = => s.log.records.asScala.filter(_.hasValue).count(m => !m.hasKey)).sum
def abortCheckDone(topicPartition: TopicPartition): Unit = {
throw new LogCleaningAbortedException()
* Test that abortion during cleaning throws a LogCleaningAbortedException
def testCleanSegmentsWithAbort(): Unit = {
val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
val keys = LogTestUtils.keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
* Validate the logic for grouping log segments together for cleaning
def testSegmentGrouping(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 300: java.lang.Integer)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// append some messages to the log
var i = 0
while(log.numberOfSegments < 10) {
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
i += 1
// grouping by very large values should result in a single group with all the segments in it
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(1, groups.size)
assertEquals(log.numberOfSegments, groups.head.size)
// grouping by very small values should result in all groups having one entry
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(log.numberOfSegments, groups.size)
assertTrue(groups.forall(_.size == 1), "All groups should be singletons.")
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = 1, log.logEndOffset)
assertEquals(log.numberOfSegments, groups.size)
assertTrue(groups.forall(_.size == 1), "All groups should be singletons.")
val groupSize = 3
// check grouping by log size
val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertTrue(groups.dropRight(1).forall(_.size == groupSize), "All but the last group should be the target size.")
// check grouping by index size
val indexSize = log.logSegments.take(groupSize).map(_.offsetIndex.sizeInBytes).sum + 1
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize, log.logEndOffset)
assertTrue(groups.dropRight(1).forall(_.size == groupSize),
"All but the last group should be the target size.")
def testSegmentGroupingWithSparseOffsetsAndEmptySegments(): Unit ={
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val k="key".getBytes()
val v="val".getBytes()
//create 3 segments
for(i <- 0 until 3){
log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
//0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
assertEquals(i + 1, log.numberOfSegments)
//4th active segment, not clean
log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
val totalSegments = 4
//last segment not cleanable
val firstUncleanableOffset = log.logEndOffset - 1
val notCleanableSegments = 1
assertEquals(totalSegments, log.numberOfSegments)
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
//because index file uses 4 byte relative index offset and current segments all none empty,
//segments will not group even their size is very small.
assertEquals(totalSegments - notCleanableSegments, groups.size)
//do clean to clean first 2 segments to empty
cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
assertEquals(totalSegments, log.numberOfSegments)
assertEquals(0, log.logSegments.head.size)
//after clean we got 2 empty segment, they will group together this time
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, firstUncleanableOffset)
val noneEmptySegment = 1
assertEquals(noneEmptySegment + 1, groups.size)
//trigger a clean and 2 empty segments should cleaned to 1
cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
assertEquals(totalSegments - 1, log.numberOfSegments)
* Validate the logic for grouping log segments together for cleaning when only a small number of
* messages are retained, but the range of offsets is greater than Int.MaxValue. A group should not
* contain a range of offsets greater than Int.MaxValue to ensure that relative offsets can be
* stored in 4 bytes.
def testSegmentGroupingWithSparseOffsets(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 400: java.lang.Integer)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
// fill up first segment
while (log.numberOfSegments == 1)
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
// forward offset and append message to next segment at offset Int.MaxValue
val records = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 1)
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
assertEquals(Int.MaxValue, log.activeSegment.offsetIndex.lastOffset)
// grouping should result in a single group with maximum relative offset of Int.MaxValue
var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(1, groups.size)
// append another message, making last offset of second segment > Int.MaxValue
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
// grouping should not group the two segments to ensure that maximum relative offset in each group <= Int.MaxValue
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(2, groups.size)
// append more messages, creating new segments, further grouping should still occur
while (log.numberOfSegments < 4)
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(log.numberOfSegments - 1, groups.size)
for (group <- groups)
assertTrue(group.last.offsetIndex.lastOffset - group.head.offsetIndex.baseOffset <= Int.MaxValue,
"Relative offset greater than Int.MaxValue")
* Following the loading of a log segment where the index file is zero sized,
* the index returned would be the base offset. Sometimes the log file would
* contain data with offsets in excess of the baseOffset which would cause
* the log cleaner to group together segments with a range of > Int.MaxValue
* this test replicates that scenario to ensure that the segments are grouped
* correctly.
def testSegmentGroupingFollowingLoadOfZeroIndex(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 400: java.lang.Integer)
//mimic the effect of loading an empty index file
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 400: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val record1 = messageWithOffset("hello".getBytes, "hello".getBytes, 0)
val record2 = messageWithOffset("hello".getBytes, "hello".getBytes, 1)
log.roll(Some(Int.MaxValue/2)) // starting a new log segment at offset Int.MaxValue/2
val record3 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue/2)
val record4 = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue.toLong + 1)
assertTrue(log.logEndOffset - 1 - log.logStartOffset > Int.MaxValue, "Actual offset range should be > Int.MaxValue")
assertTrue(log.logSegments.last.offsetIndex.lastOffset - log.logStartOffset <= Int.MaxValue,
"index.lastOffset is reporting the wrong last offset")
// grouping should result in two groups because the second segment takes the offset range > MaxInt
val groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = Int.MaxValue, log.logEndOffset)
assertEquals(2, groups.size)
for (group <- groups)
assertTrue(group.last.readNextOffset - 1 - group.head.baseOffset <= Int.MaxValue,
"Relative offset greater than Int.MaxValue")
private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
val offsets = groups.flatMap(
assertEquals(offsets.sorted, offsets, "Offsets should be in increasing order.")
* Test building an offset map off the log
def testBuildOffsetMap(): Unit = {
val map = new FakeOffsetMap(1000)
val log = makeLog()
val cleaner = makeCleaner(Int.MaxValue)
val start = 0
val end = 500
writeToLog(log, (start until end) zip (start until end))
def checkRange(map: FakeOffsetMap, start: Int, end: Int): Unit = {
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, start, end, map, stats)
val endOffset = map.latestOffset + 1
assertEquals(end, endOffset, "Last offset should be the end offset.")
assertEquals(end-start, map.size, "Should have the expected number of messages in the map.")
for(i <- start until end)
assertEquals(i.toLong, map.get(key(i)), "Should find all the keys")
assertEquals(-1L, map.get(key(start - 1)), "Should not find a value too small")
assertEquals(-1L, map.get(key(end)), "Should not find a value too large")
assertEquals(end - start, stats.mapMessagesRead)
val segments = log.logSegments.toSeq
checkRange(map, 0, segments(1).baseOffset.toInt)
checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt)
checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
def testSegmentWithOffsetOverflow(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 1000: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
val log = makeLog(config = config, recoveryPoint = Long.MaxValue)
val segmentWithOverflow = LogTestUtils.firstOverflowSegment(log).getOrElse {
throw new AssertionError("Failed to create log with a segment which has overflowed offsets")
val numSegmentsInitial = log.logSegments.size
val allKeys = LogTestUtils.keysInLog(log).toList
val expectedKeysAfterCleaning = new mutable.ArrayBuffer[Long]()
// pretend we want to clean every alternate key
val offsetMap = new FakeOffsetMap(Int.MaxValue)
for (k <- 1 until allKeys.size by 2) {
expectedKeysAfterCleaning += allKeys(k - 1)
offsetMap.put(key(allKeys(k)), Long.MaxValue)
// Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, Seq(segmentWithOverflow), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
assertEquals(allKeys, LogTestUtils.keysInLog(log))
// Clean each segment now that split is complete.
for (segmentToClean <- log.logSegments)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
* Tests recovery if broker crashes at the following stages during the cleaning sequence
* <ol>
* <li> Cleaner has created .cleaned log containing multiple segments, swap sequence not yet started
* <li> .cleaned log renamed to .swap, old segment files not yet renamed to .deleted
* <li> .cleaned log renamed to .swap, old segment files renamed to .deleted, but not yet deleted
* <li> .swap suffix removed, completing the swap, but async delete of .deleted files not yet complete
* </ol>
def testRecoveryAfterCrash(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 300: java.lang.Integer)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1: java.lang.Integer)
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 10: java.lang.Integer)
val config = LogConfig.fromProps(logConfig.originals, logProps)
// create a log and append some messages
var log = makeLog(config = config)
var messageCount = 0
while (log.numberOfSegments < 10) {
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
messageCount += 1
val allKeys = LogTestUtils.keysInLog(log)
// pretend we have odd-numbered keys
val offsetMap = new FakeOffsetMap(Int.MaxValue)
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
// clean the log
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
var cleanedKeys = LogTestUtils.keysInLog(log)
// 1) Simulate recovery just after .cleaned file is created, before rename to .swap
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
log = recoverAndCheck(config, allKeys)
// clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
cleanedKeys = LogTestUtils.keysInLog(log)
// 2) Simulate recovery just after .cleaned file is created, and a subset of them are renamed to .swap
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.head.changeFileSuffixes("", UnifiedLog.CleanedFileSuffix)
log.logSegments.head.log.renameTo(new File(Utils.replaceSuffix(log.logSegments.head.log.file.getPath, UnifiedLog.CleanedFileSuffix, UnifiedLog.SwapFileSuffix)))
for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
log = recoverAndCheck(config, allKeys)
// clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
cleanedKeys = LogTestUtils.keysInLog(log)
// 3) Simulate recovery just after swap file is created, before old segment files are
// renamed to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(Utils.replaceSuffix(file.getPath, LogFileUtils.DELETED_FILE_SUFFIX, "")), false)
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again
while (log.numberOfSegments < 10) {
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
messageCount += 1
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
cleanedKeys = LogTestUtils.keysInLog(log)
// 4) Simulate recovery after swap file is created and old segments files are renamed
// to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.changeFileSuffixes("", UnifiedLog.SwapFileSuffix)
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again
while (log.numberOfSegments < 10) {
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
messageCount += 1
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
cleanedKeys = LogTestUtils.keysInLog(log)
// 5) Simulate recovery after a subset of swap files are renamed to regular files and old segments files are renamed
// to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.timeIndex.file.renameTo(new File(Utils.replaceSuffix(log.logSegments.head.timeIndex.file.getPath, "", UnifiedLog.SwapFileSuffix)))
log = recoverAndCheck(config, cleanedKeys)
// add some more messages and clean the log again
while (log.numberOfSegments < 10) {
log.appendAsLeader(record(log.logEndOffset.toInt, log.logEndOffset.toInt), leaderEpoch = 0)
messageCount += 1
for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata, -1)
// clear scheduler so that async deletes don't run
cleanedKeys = LogTestUtils.keysInLog(log)
// 6) Simulate recovery after swap is complete, but async deletion
// is not yet complete. Clean operation is resumed during recovery.
log = recoverAndCheck(config, cleanedKeys)
def testBuildOffsetMapFakeLarge(): Unit = {
val map = new FakeOffsetMap(1000)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 120: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 120: java.lang.Integer)
val logConfig = new LogConfig(logProps)
val log = makeLog(config = logConfig)
val cleaner = makeCleaner(Int.MaxValue)
val keyStart = 0
val keyEnd = 2
val offsetStart = 0L
val offsetEnd = 7206178L
val offsetSeq = Seq(offsetStart, offsetEnd)
writeToLog(log, (keyStart until keyEnd) zip (keyStart until keyEnd), offsetSeq)
cleaner.buildOffsetMap(log, keyStart, offsetEnd + 1L, map, new CleanerStats())
assertEquals(offsetEnd, map.latestOffset, "Last offset should be the end offset.")
assertEquals(keyEnd - keyStart, map.size, "Should have the expected number of messages in the map.")
assertEquals(0L, map.get(key(0)), "Map should contain first value")
assertEquals(offsetEnd, map.get(key(1)), "Map should contain second value")
* Test building a partial offset map of part of a log segment
def testBuildPartialOffsetMap(): Unit = {
// because loadFactor is 0.75, this means we can fit 2 messages in the map
val log = makeLog()
val cleaner = makeCleaner(3)
val map = cleaner.offsetMap
log.appendAsLeader(record(0,0), leaderEpoch = 0)
log.appendAsLeader(record(1,1), leaderEpoch = 0)
log.appendAsLeader(record(2,2), leaderEpoch = 0)
log.appendAsLeader(record(3,3), leaderEpoch = 0)
log.appendAsLeader(record(4,4), leaderEpoch = 0)
val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 2, Int.MaxValue, map, stats)
assertEquals(2, map.size)
assertEquals(-1, map.get(key(0)))
assertEquals(2, map.get(key(2)))
assertEquals(3, map.get(key(3)))
assertEquals(-1, map.get(key(4)))
assertEquals(4, stats.mapMessagesRead)
* This test verifies that messages corrupted by KAFKA-4298 are fixed by the cleaner
def testCleanCorruptMessageSet(): Unit = {
val codec = CompressionType.GZIP
val logProps = new Properties()
val logConfig = new LogConfig(logProps)
val log = makeLog(config = logConfig)
val cleaner = makeCleaner(10)
// messages are constructed so that the payload matches the expecting offset to
// make offset validation easier after cleaning
// one compressed log entry with duplicates
val dupSetKeys = (0 until 2) ++ (0 until 2)
val dupSetOffset = 25
val dupSet = dupSetKeys zip (dupSetOffset until dupSetOffset + dupSetKeys.size)
// and one without (should still be fixed by the cleaner)
val noDupSetKeys = 3 until 5
val noDupSetOffset = 50
val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + noDupSetKeys.size)
log.appendAsFollower(invalidCleanedMessage(dupSetOffset, dupSet, codec))
log.appendAsFollower(invalidCleanedMessage(noDupSetOffset, noDupSet, codec))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
for (segment <- log.logSegments; batch <- segment.log.batches.asScala; record <- batch.asScala) {
val value = TestUtils.readString(record.value).toLong
assertEquals(record.offset, value)
* Verify that the client can handle corrupted messages. Located here for now since the client
* does not support writing messages with the old magic.
def testClientHandlingOfCorruptMessageSet(): Unit = {
val keys = 1 until 10
val offset = 50
val set = keys zip (offset until offset + keys.size)
val corruptedMessage = invalidCleanedMessage(offset, set)
val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
for (logEntry <- records.records.asScala) {
val offset = logEntry.offset
val value = TestUtils.readString(logEntry.value).toLong
assertEquals(offset, value)
def testCleanTombstone(): Unit = {
val logConfig = new LogConfig(new Properties())
val log = makeLog(config = logConfig)
val cleaner = makeCleaner(10)
// Append a message with a large timestamp.
log.appendAsLeader(TestUtils.singletonRecords(value = "0".getBytes,
key = "0".getBytes,
timestamp = time.milliseconds() + logConfig.deleteRetentionMs + 10000), leaderEpoch = 0)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, log.activeSegment.baseOffset))
// Append a tombstone with a small timestamp and roll out a new log segment.
log.appendAsLeader(TestUtils.singletonRecords(value = null,
key = "0".getBytes,
timestamp = time.milliseconds() - logConfig.deleteRetentionMs - 10000), leaderEpoch = 0)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 1, log.activeSegment.baseOffset))
"The tombstone should be retained.")
// Append a message and roll out another log segment.
log.appendAsLeader(TestUtils.singletonRecords(value = "1".getBytes,
key = "1".getBytes,
timestamp = time.milliseconds()), leaderEpoch = 0)
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
"The tombstone should be retained.")
* Verify that the clean is able to move beyond missing offsets records in dirty log
def testCleaningBeyondMissingOffsets(): Unit = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024*1024: java.lang.Integer)
val logConfig = new LogConfig(logProps)
val cleaner = makeCleaner(Int.MaxValue)
val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
// roll new segment with baseOffset 11, leaving previous with holes in offset range [9,10]
// active segment record
log.appendAsFollower(messageWithOffset(1015, 1015, 11L))
val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
assertEquals(log.activeSegment.baseOffset, nextDirtyOffset,
"Cleaning point should pass offset gap")
val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
// roll new segment with baseOffset 15, leaving previous with holes in offset rage [10, 14]
writeToLog(log, (15 to 24) zip (15 to 24), (15L to 24L))
// roll new segment with baseOffset 30, leaving previous with holes in offset range [25, 29]
// active segment record
log.appendAsFollower(messageWithOffset(1015, 1015, 30L))
val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
assertEquals(log.activeSegment.baseOffset, nextDirtyOffset,
"Cleaning point should pass offset gap in multiple segments")
def testMaxCleanTimeSecs(): Unit = {
val logCleaner = new LogCleaner(new CleanerConfig(true),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
def checkGauge(name: String): Unit = {
val gauge = logCleaner.metricsGroup.newGauge(name, () => 999)
// if there is no cleaners, 0 is default value
assertEquals(0, gauge.value())
try {
} finally logCleaner.shutdown()
def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
oldKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "10000000")
val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
logDirs = Array(TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time) {
// shutdown() and startup() are called in LogCleaner.reconfigure().
// Empty startup() and shutdown() to ensure that no unnecessary log cleaner threads remain after this test.
override def startup(): Unit = {}
override def shutdown(): Unit = {}
try {
assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
newKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "20000000")
logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
} finally {
private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for(((key, value), offset) <-
yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset
private def invalidCleanedMessage(initialOffset: Long,
keysAndValues: Iterable[(Int, Int)],
codec: CompressionType = CompressionType.GZIP): MemoryRecords = {
// this function replicates the old versions of the cleaner which under some circumstances
// would write invalid compressed message sets with the outer magic set to 1 and the inner
// magic set to 0
val records = =>
val buffer = ByteBuffer.allocate(math.min(math.max( / 2, 1024), 1 << 16))
val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, codec, TimestampType.CREATE_TIME, initialOffset)
var offset = initialOffset
records.foreach { record =>
builder.appendUncheckedWithOffset(offset, record)
offset += 1
private def messageWithOffset(key: Array[Byte], value: Array[Byte], offset: Long): MemoryRecords =
MemoryRecords.withRecords(offset, CompressionType.NONE, 0, new SimpleRecord(key, value))
private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords =
messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long = 0L) = {
dir = dir,
config = config,
logStartOffset = 0L,
recoveryPoint = recoveryPoint,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true
private def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = _ => (), maxMessageSize: Int = 64*1024) =
new Cleaner(id = 0,
offsetMap = new FakeOffsetMap(capacity),
ioBufferSize = maxMessageSize,
maxIoBufferSize = maxMessageSize,
dupBufferLoadFactor = 0.75,
throttler = throttler,
time = time,
checkDone = checkDone)
private def writeToLog(log: UnifiedLog, seq: Iterable[(Int, Int)]): Iterable[Long] = {
for ((key, value) <- seq)
yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset.get.messageOffset
private def key(id: Long) = ByteBuffer.wrap(id.toString.getBytes)
private def record(key: Int, value: Int,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
sequence: Int = RecordBatch.NO_SEQUENCE,
partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
MemoryRecords.withIdempotentRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, CompressionType.NONE, producerId, producerEpoch, sequence,
partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
private def appendTransactionalAsLeader(
log: UnifiedLog,
producerId: Long,
producerEpoch: Short,
leaderEpoch: Int = 0,
origin: AppendOrigin = AppendOrigin.CLIENT
): Seq[Int] => LogAppendInfo = {
isTransactional = true,
leaderEpoch = leaderEpoch,
origin = origin
private def appendIdempotentAsLeader(
log: UnifiedLog,
producerId: Long,
producerEpoch: Short,
isTransactional: Boolean = false,
leaderEpoch: Int = 0,
origin: AppendOrigin = AppendOrigin.CLIENT
): Seq[Int] => LogAppendInfo = {
var sequence = 0
keys: Seq[Int] => {
val simpleRecords = { key =>
val keyBytes = key.toString.getBytes
new SimpleRecord(time.milliseconds(), keyBytes, keyBytes) // the value doesn't matter since we validate offsets
val records = if (isTransactional)
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords.toArray: _*)
MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords.toArray: _*)
sequence += simpleRecords.size
log.appendAsLeader(records, leaderEpoch, origin)
private def commitMarker(producerId: Long, producerEpoch: Short, timestamp: Long = time.milliseconds()): MemoryRecords =
endTxnMarker(producerId, producerEpoch, ControlRecordType.COMMIT, 0L, timestamp)
private def abortMarker(producerId: Long, producerEpoch: Short, timestamp: Long = time.milliseconds()): MemoryRecords =
endTxnMarker(producerId, producerEpoch, ControlRecordType.ABORT, 0L, timestamp)
private def endTxnMarker(producerId: Long, producerEpoch: Short, controlRecordType: ControlRecordType,
offset: Long, timestamp: Long): MemoryRecords = {
val endTxnMarker = new EndTransactionMarker(controlRecordType, 0)
MemoryRecords.withEndTransactionMarker(offset, timestamp, RecordBatch.NO_PARTITION_LEADER_EPOCH,
producerId, producerEpoch, endTxnMarker)
private def record(key: Int, value: Array[Byte]): MemoryRecords =
TestUtils.singletonRecords(key = key.toString.getBytes, value = value)
private def unkeyedRecord(value: Int): MemoryRecords =
TestUtils.singletonRecords(value = value.toString.getBytes)
private def tombstoneRecord(key: Int): MemoryRecords = record(key, null)
private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long]): UnifiedLog = {
LogTestUtils.recoverAndCheck(dir, config, expectedKeys, new BrokerTopicStats(), time, time.scheduler)
* We need to run a two pass clean to perform the following steps to stimulate a proper clean:
* 1. On the first run, set the delete horizon in the batches with tombstone or markers with empty txn records.
* 2. For the second pass, we will advance the current time by tombstoneRetentionMs, which will cause the
* tombstones to expire, leading to their prompt removal from the log.
* Returns the first dirty offset in the log as a result of the second cleaning.
private def runTwoPassClean(cleaner: Cleaner, logToClean: LogToClean, currentTime: Long,
tombstoneRetentionMs: Long = 86400000) : Long = {
cleaner.doClean(logToClean, currentTime)
cleaner.doClean(logToClean, currentTime + tombstoneRetentionMs + 1)._1
class FakeOffsetMap(val slots: Int) extends OffsetMap {
val map = new java.util.HashMap[String, Long]()
var lastOffset = -1L
private def keyFor(key: ByteBuffer) =
new String(Utils.readBytes(key.duplicate), StandardCharsets.UTF_8)
override def put(key: ByteBuffer, offset: Long): Unit = {
lastOffset = offset
map.put(keyFor(key), offset)
override def get(key: ByteBuffer): Long = {
val k = keyFor(key)
override def clear(): Unit = map.clear()
override def size: Int = map.size
override def latestOffset: Long = lastOffset
override def updateLatestOffset(offset: Long): Unit = {
lastOffset = offset
override def toString: String = map.toString