blob: 1575542abfcaf5443a8fcb6d2316c5c458c0ca74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import java.nio.file.Files
import java.util.Properties
import kafka.server.BrokerTopicStats
import kafka.utils.{Pool, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.junit.jupiter.api.{AfterEach, Tag}
import scala.collection.Seq
import scala.collection.mutable.ListBuffer
import scala.util.Random
@Tag("integration")
abstract class AbstractLogCleanerIntegrationTest {
var cleaner: LogCleaner = _
val logDir = TestUtils.tempDir()
private val logs = ListBuffer.empty[UnifiedLog]
private val defaultMaxMessageSize = 128
private val defaultMinCleanableDirtyRatio = 0.0F
private val defaultMinCompactionLagMS = 0L
private val defaultDeleteDelay = 1000
private val defaultSegmentSize = 2048
private val defaultMaxCompactionLagMs = Long.MaxValue
def time: MockTime
@AfterEach
def teardown(): Unit = {
if (cleaner != null)
cleaner.shutdown()
time.scheduler.shutdown()
logs.foreach(_.close())
Utils.delete(logDir)
}
def logConfigProperties(propertyOverrides: Properties = new Properties(),
maxMessageSize: Int,
minCleanableDirtyRatio: Float = defaultMinCleanableDirtyRatio,
minCompactionLagMs: Long = defaultMinCompactionLagMS,
deleteDelay: Int = defaultDeleteDelay,
segmentSize: Int = defaultSegmentSize,
maxCompactionLagMs: Long = defaultMaxCompactionLagMs): Properties = {
val props = new Properties()
props.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize: java.lang.Integer)
props.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentSize: java.lang.Integer)
props.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 100*1024: java.lang.Integer)
props.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, deleteDelay: java.lang.Integer)
props.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
props.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, minCleanableDirtyRatio: java.lang.Float)
props.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString)
props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, minCompactionLagMs: java.lang.Long)
props.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, maxCompactionLagMs: java.lang.Long)
props ++= propertyOverrides
props
}
def makeCleaner(partitions: Iterable[TopicPartition],
minCleanableDirtyRatio: Float = defaultMinCleanableDirtyRatio,
numThreads: Int = 1,
backoffMs: Long = 15000L,
maxMessageSize: Int = defaultMaxMessageSize,
minCompactionLagMs: Long = defaultMinCompactionLagMS,
deleteDelay: Int = defaultDeleteDelay,
segmentSize: Int = defaultSegmentSize,
maxCompactionLagMs: Long = defaultMaxCompactionLagMs,
cleanerIoBufferSize: Option[Int] = None,
propertyOverrides: Properties = new Properties()): LogCleaner = {
val logMap = new Pool[TopicPartition, UnifiedLog]()
for (partition <- partitions) {
val dir = new File(logDir, s"${partition.topic}-${partition.partition}")
Files.createDirectories(dir.toPath)
val logConfig = new LogConfig(logConfigProperties(propertyOverrides,
maxMessageSize = maxMessageSize,
minCleanableDirtyRatio = minCleanableDirtyRatio,
minCompactionLagMs = minCompactionLagMs,
deleteDelay = deleteDelay,
segmentSize = segmentSize,
maxCompactionLagMs = maxCompactionLagMs))
val log = UnifiedLog(
dir = dir,
config = logConfig,
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time,
brokerTopicStats = new BrokerTopicStats,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
logDirFailureChannel = new LogDirFailureChannel(10),
topicId = None,
keepPartitionMetadataFile = true)
logMap.put(partition, log)
this.logs += log
}
val cleanerConfig = new CleanerConfig(
numThreads,
4 * 1024 * 1024L,
0.9,
cleanerIoBufferSize.getOrElse(maxMessageSize / 2),
maxMessageSize,
Double.MaxValue,
backoffMs,
true)
new LogCleaner(cleanerConfig,
logDirs = Array(logDir),
logs = logMap,
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
}
private var ctr = 0
def counter: Int = ctr
def incCounter(): Unit = ctr += 1
def writeDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType,
startKey: Int = 0, magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): Seq[(Int, String, Long)] = {
for(_ <- 0 until numDups; key <- startKey until (startKey + numKeys)) yield {
val value = counter.toString
val appendInfo = log.appendAsLeader(TestUtils.singletonRecords(value = value.getBytes, codec = codec,
key = key.toString.getBytes, magicValue = magicValue), leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
incCounter()
(key, value, appendInfo.firstOffset.get.messageOffset)
}
}
def createLargeSingleMessageSet(key: Int, messageFormatVersion: Byte, codec: CompressionType): (String, MemoryRecords) = {
def messageValue(length: Int): String = {
val random = new Random(0)
new String(random.alphanumeric.take(length).toArray)
}
val value = messageValue(128)
val messageSet = TestUtils.singletonRecords(value = value.getBytes, codec = codec, key = key.toString.getBytes,
magicValue = messageFormatVersion)
(value, messageSet)
}
}