blob: 5ab53699cd433da1f48814447e2cfcd35fd326b6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import java.util.Properties
import kafka.server.KafkaConfig
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record._
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_0_IV1, IBP_0_11_0_IV0, IBP_0_9_0}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, ArgumentsSource}
import scala.annotation.nowarn
import scala.collection._
import scala.jdk.CollectionConverters._
/**
* This is an integration test that tests the fully integrated log cleaner
*/
class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrationTest {
val time = new MockTime()
val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2))
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
def cleanerTest(codec: CompressionType): Unit = {
val largeMessageKey = 20
val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, codec)
val maxMessageSize = largeMessageSet.sizeInBytes
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
val log = cleaner.logs.get(topicPartitions(0))
val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
val startSize = log.size
cleaner.startup()
val firstDirty = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.map(_.size).sum
assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize")
checkLogAfterAppendingDups(log, startSize, appends)
val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val largeMessageOffset = appendInfo.firstOffset.get.messageOffset
val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100, numDups = 3, log = log, codec = codec)
val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dups
val firstDirty2 = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty2)
checkLogAfterAppendingDups(log, startSize, appends2)
// simulate deleting a partition, by removing it from logs
// force a checkpoint
// and make sure its gone from checkpoint file
cleaner.logs.remove(topicPartitions(0))
cleaner.updateCheckpoints(logDir, partitionToRemove = Option(topicPartitions(0)))
val checkpoints = new OffsetCheckpointFile(new File(logDir, cleaner.cleanerManager.offsetCheckpointFile)).read()
// we expect partition 0 to be gone
assertFalse(checkpoints.contains(topicPartitions(0)))
}
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
def testCleansCombinedCompactAndDeleteTopic(codec: CompressionType): Unit = {
val logProps = new Properties()
val retentionMs: Integer = 100000
logProps.put(TopicConfig.RETENTION_MS_CONFIG, retentionMs: Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
def runCleanerAndCheckCompacted(numKeys: Int): (UnifiedLog, Seq[(Int, String, Long)]) = {
cleaner = makeCleaner(partitions = topicPartitions.take(1), propertyOverrides = logProps, backoffMs = 100L)
val log = cleaner.logs.get(topicPartitions(0))
val messages = writeDups(numKeys = numKeys, numDups = 3, log = log, codec = codec)
val startSize = log.size
log.updateHighWatermark(log.logEndOffset)
val firstDirty = log.activeSegment.baseOffset
cleaner.startup()
// should compact the log
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.map(_.size).sum
assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize")
(log, messages)
}
val (log, _) = runCleanerAndCheckCompacted(100)
// Set the last modified time to an old value to force deletion of old segments
val endOffset = log.logEndOffset
log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs))
TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset,
"Timed out waiting for deletion of old segments")
assertEquals(1, log.numberOfSegments)
cleaner.shutdown()
// run the cleaner again to make sure if there are no issues post deletion
val (log2, messages) = runCleanerAndCheckCompacted(20)
val read = readFromLog(log2)
assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't change")
}
@nowarn("cat=deprecation")
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
def testCleanerWithMessageFormatV0(codec: CompressionType): Unit = {
val largeMessageKey = 20
val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, codec)
val maxMessageSize = codec match {
case CompressionType.NONE => largeMessageSet.sizeInBytes
case _ =>
// the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to
// increase because the broker offsets are larger than the ones assigned by the client
// adding `5` to the message set size is good enough for this test: it covers the increased message size while
// still being less than the overhead introduced by the conversion from message format version 0 to 1
largeMessageSet.sizeInBytes + 5
}
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize)
val log = cleaner.logs.get(topicPartitions(0))
val props = logConfigProperties(maxMessageSize = maxMessageSize)
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_9_0.version)
log.updateConfig(new LogConfig(props))
val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
val startSize = log.size
cleaner.startup()
val firstDirty = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.map(_.size).sum
assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize")
checkLogAfterAppendingDups(log, startSize, appends)
val appends2: Seq[(Int, String, Long)] = {
val dupsV0 = writeDups(numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val largeMessageOffset = appendInfo.firstOffset.map[Long](_.messageOffset).get
// also add some messages with version 1 and version 2 to check that we handle mixed format versions correctly
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_11_0_IV0.version)
log.updateConfig(new LogConfig(props))
val dupsV1 = writeDups(startKey = 30, numKeys = 40, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
val dupsV2 = writeDups(startKey = 15, numKeys = 5, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V2)
appends ++ dupsV0 ++ Seq((largeMessageKey, largeMessageValue, largeMessageOffset)) ++ dupsV1 ++ dupsV2
}
val firstDirty2 = log.activeSegment.baseOffset
checkLastCleaned("log", 0, firstDirty2)
checkLogAfterAppendingDups(log, startSize, appends2)
}
@nowarn("cat=deprecation")
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
def testCleaningNestedMessagesWithV0AndV1(codec: CompressionType): Unit = {
val maxMessageSize = 192
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, segmentSize = 256)
val log = cleaner.logs.get(topicPartitions(0))
val props = logConfigProperties(maxMessageSize = maxMessageSize, segmentSize = 256)
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_9_0.version)
log.updateConfig(new LogConfig(props))
// with compression enabled, these messages will be written as a single message containing
// all of the individual messages
var appendsV0 = writeDupsSingleMessageSet(numKeys = 2, numDups = 3, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
appendsV0 ++= writeDupsSingleMessageSet(numKeys = 2, startKey = 3, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V0)
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, IBP_0_10_0_IV1.version)
log.updateConfig(new LogConfig(props))
var appendsV1 = writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
appendsV1 ++= writeDupsSingleMessageSet(startKey = 4, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
appendsV1 ++= writeDupsSingleMessageSet(startKey = 6, numKeys = 2, numDups = 2, log = log, codec = codec, magicValue = RecordBatch.MAGIC_VALUE_V1)
val appends = appendsV0 ++ appendsV1
val startSize = log.size
cleaner.startup()
val firstDirty = log.activeSegment.baseOffset
assertTrue(firstDirty > appendsV0.size) // ensure we clean data from V0 and V1
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.map(_.size).sum
assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize")
checkLogAfterAppendingDups(log, startSize, appends)
}
@ParameterizedTest
@ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.AllCompressions])
def cleanerConfigUpdateTest(codec: CompressionType): Unit = {
val largeMessageKey = 20
val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, codec)
val maxMessageSize = largeMessageSet.sizeInBytes
cleaner = makeCleaner(partitions = topicPartitions, backoffMs = 1, maxMessageSize = maxMessageSize,
cleanerIoBufferSize = Some(1))
val log = cleaner.logs.get(topicPartitions(0))
writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
val startSize = log.size
cleaner.startup()
assertEquals(1, cleaner.cleanerCount)
// Verify no cleaning with LogCleanerIoBufferSizeProp=1
val firstDirty = log.activeSegment.baseOffset
val topicPartition = new TopicPartition("log", 0)
cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
assertTrue(cleaner.cleanerManager.allCleanerCheckpoints.isEmpty, "Should not have cleaned")
def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, "localhost:2181")
props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString)
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString)
props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString)
props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString)
props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString)
props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backoffMs.toString)
props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString)
KafkaConfig.fromProps(props)
}
// Verify cleaning done with larger LogCleanerIoBufferSizeProp
val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
val newConfig = kafkaConfigWithCleanerConfig(new CleanerConfig(2,
cleaner.currentConfig.dedupeBufferSize,
cleaner.currentConfig.dedupeBufferLoadFactor,
100000,
cleaner.currentConfig.maxMessageSize,
cleaner.currentConfig.maxIoBytesPerSecond,
cleaner.currentConfig.backoffMs,
true))
cleaner.reconfigure(oldConfig, newConfig)
assertEquals(2, cleaner.cleanerCount)
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.map(_.size).sum
assertTrue(startSize > compactedSize, s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize")
}
private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long): Unit = {
// wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
// TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG
val topicPartition = new TopicPartition(topic, partitionId)
cleaner.awaitCleaned(topicPartition, firstDirty)
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(topicPartition)
assertTrue(lastCleaned >= firstDirty, s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned")
}
private def checkLogAfterAppendingDups(log: UnifiedLog, startSize: Long, appends: Seq[(Int, String, Long)]): Unit = {
val read = readFromLog(log)
assertEquals(toMap(appends), toMap(read), "Contents of the map shouldn't change")
assertTrue(startSize > log.size)
}
private def toMap(messages: Iterable[(Int, String, Long)]): Map[Int, (String, Long)] = {
messages.map { case (key, value, offset) => key -> (value, offset) }.toMap
}
private def readFromLog(log: UnifiedLog): Iterable[(Int, String, Long)] = {
for (segment <- log.logSegments; deepLogEntry <- segment.log.records.asScala) yield {
val key = TestUtils.readString(deepLogEntry.key).toInt
val value = TestUtils.readString(deepLogEntry.value)
(key, value, deepLogEntry.offset)
}
}
private def writeDupsSingleMessageSet(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType,
startKey: Int = 0, magicValue: Byte): Seq[(Int, String, Long)] = {
val kvs = for (_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val payload = counter.toString
incCounter()
(key, payload)
}
val records = kvs.map { case (key, payload) =>
new SimpleRecord(key.toString.getBytes, payload.getBytes)
}
val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue, codec, records: _*), leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
val offsets = appendInfo.firstOffset.get.messageOffset to appendInfo.lastOffset
kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
}
}
object LogCleanerParameterizedIntegrationTest {
class AllCompressions extends ArgumentsProvider {
override def provideArguments(context: ExtensionContext): java.util.stream.Stream[_ <: Arguments] =
java.util.Arrays.stream(CompressionType.values.map(codec => Arguments.of(codec)))
}
// zstd compression is not supported with older message formats (i.e supported by V0 and V1)
class ExcludeZstd extends ArgumentsProvider {
override def provideArguments(context: ExtensionContext): java.util.stream.Stream[_ <: Arguments] =
java.util.Arrays.stream(CompressionType.values.filter(_ != CompressionType.ZSTD).map(codec => Arguments.of(codec)))
}
}