package kafka.log
import java.util.Properties
import kafka.common.TopicAndPartition
import kafka.message._
import kafka.utils._
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import scala.collection._
* This is an integration test that tests the fully integrated log cleaner
@RunWith(value = classOf[Parameterized])
class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging {
val msPerHour = 60 * 60 * 1000
val compactionLag = 1 * msPerHour
assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0)
val time = new MockTime(1400000000000L) // Tue May 13 16:53:20 UTC 2014
val cleanerBackOffMs = 200L
val segmentSize = 100
val deleteDelay = 1000
val logName = "log"
val logDir = TestUtils.tempDir()
var counter = 0
val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
val compressionCodec = CompressionCodec.getCompressionCodec(compressionCodecName)
def cleanerTest(): Unit = {
val cleaner = makeCleaner(parts = 3, backOffMs = cleanerBackOffMs)
val log = cleaner.logs.get(topics(0))
// t = T0
val T0 = time.milliseconds
val appends0 = writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T0)
val startSizeBlock0 = log.size
debug(s"total log size at T0: $startSizeBlock0")
val activeSegAtT0 = log.activeSegment
debug(s"active segment at T0 has base offset: ${activeSegAtT0.baseOffset}")
val sizeUpToActiveSegmentAtT0 = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
debug(s"log size up to base offset of active segment at T0: $sizeUpToActiveSegmentAtT0")
// T0 < t < T1
// advance to a time still less than one compaction lag from start
Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
assertEquals("There should be no cleaning until the compaction lag has passed", startSizeBlock0, log.size)
// t = T1 > T0 + compactionLag
// advance to time a bit more than one compaction lag from start
time.sleep(compactionLag/2 + 1)
val T1 = time.milliseconds
// write another block of data
val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T1)
val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset
// the first block should get cleaned
cleaner.awaitCleaned("log", 0, activeSegAtT0.baseOffset)
// check the data is the same
val read1 = readFromLog(log)
assertEquals("Contents of the map shouldn't change.", appends1.toMap, read1.toMap)
val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize")
val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset)
assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize",
sizeUpToActiveSegmentAtT0 > compactedSize)
private def readFromLog(log: Log): Iterable[(Int, Int)] = {
for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
// create single message iterator or deep iterator depending on compression codec
if (entry.message.compressionCodec == NoCompressionCodec)
Stream.cons(entry, Stream.empty).iterator
}) yield {
val key = TestUtils.readString(messageAndOffset.message.key).toInt
val value = TestUtils.readString(messageAndOffset.message.payload).toInt
key -> value
private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
val info = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
counter += 1
(key, count)
def teardown(): Unit = {
/* create a cleaner instance and logs with the given parameters */
private def makeCleaner(parts: Int,
minCleanableDirtyRatio: Float = 0.0F,
numThreads: Int = 1,
backOffMs: Long = 200L,
defaultPolicy: String = "compact",
policyOverrides: Map[String, String] = Map()): LogCleaner = {
// create partitions and add them to the pool
val logs = new Pool[TopicAndPartition, Log]()
for(i <- 0 until parts) {
val dir = new File(logDir, "log-" + i)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
val log = new Log(dir = dir,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
logs.put(TopicAndPartition("log", i), log)
new LogCleaner(CleanerConfig(numThreads = numThreads, backOffMs = backOffMs),
logDirs = Array(logDir),
logs = logs,
time = time)
object LogCleanerLagIntegrationTest {
def oneParameter: java.util.Collection[Array[String]] = {
val l = new java.util.ArrayList[Array[String]]()
def parameters: java.util.Collection[Array[String]] = {
val list = new java.util.ArrayList[Array[String]]()
for (codec <- CompressionType.values)