| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.log |
| |
| import java.io.PrintWriter |
| import com.yammer.metrics.core.{Gauge, MetricName} |
| import kafka.utils.TestUtils |
| import org.apache.kafka.common.TopicPartition |
| import org.apache.kafka.common.record.{CompressionType, RecordBatch} |
| import org.apache.kafka.server.metrics.KafkaYammerMetrics |
| import org.apache.kafka.server.util.MockTime |
| import org.junit.jupiter.api.Assertions._ |
| import org.junit.jupiter.api.{AfterEach, Test} |
| |
| import scala.collection.{Iterable, Seq} |
| import scala.jdk.CollectionConverters._ |
| |
| /** |
| * This is an integration test that tests the fully integrated log cleaner |
| */ |
| class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest { |
| |
| val codec: CompressionType = CompressionType.LZ4 |
| |
| val time = new MockTime() |
| val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)) |
| |
| @AfterEach |
| def cleanup(): Unit = { |
| TestUtils.clearYammerMetrics() |
| } |
| |
| @Test |
| def testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics(): Unit = { |
| val largeMessageKey = 20 |
| val (_, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE, codec) |
| val maxMessageSize = largeMessageSet.sizeInBytes |
| cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize, backoffMs = 100) |
| |
| def breakPartitionLog(tp: TopicPartition): Unit = { |
| val log = cleaner.logs.get(tp) |
| writeDups(numKeys = 20, numDups = 3, log = log, codec = codec) |
| |
| val partitionFile = log.logSegments.last.log.file() |
| val writer = new PrintWriter(partitionFile) |
| writer.write("jogeajgoea") |
| writer.close() |
| |
| writeDups(numKeys = 20, numDups = 3, log = log, codec = codec) |
| } |
| |
| breakPartitionLog(topicPartitions(0)) |
| breakPartitionLog(topicPartitions(1)) |
| |
| cleaner.startup() |
| |
| val log = cleaner.logs.get(topicPartitions(0)) |
| val log2 = cleaner.logs.get(topicPartitions(1)) |
| val uncleanableDirectory = log.dir.getParent |
| val uncleanablePartitionsCountGauge = getGauge[Int]("uncleanable-partitions-count", uncleanableDirectory) |
| val uncleanableBytesGauge = getGauge[Long]("uncleanable-bytes", uncleanableDirectory) |
| |
| TestUtils.waitUntilTrue(() => uncleanablePartitionsCountGauge.value() == 2, "There should be 2 uncleanable partitions", 2000L) |
| val expectedTotalUncleanableBytes = LogCleanerManager.calculateCleanableBytes(log, 0, log.logSegments.last.baseOffset)._2 + |
| LogCleanerManager.calculateCleanableBytes(log2, 0, log2.logSegments.last.baseOffset)._2 |
| TestUtils.waitUntilTrue(() => uncleanableBytesGauge.value() == expectedTotalUncleanableBytes, |
| s"There should be $expectedTotalUncleanableBytes uncleanable bytes", 1000L) |
| |
| val uncleanablePartitions = cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory) |
| assertTrue(uncleanablePartitions.contains(topicPartitions(0))) |
| assertTrue(uncleanablePartitions.contains(topicPartitions(1))) |
| assertFalse(uncleanablePartitions.contains(topicPartitions(2))) |
| |
| // Delete one partition |
| cleaner.logs.remove(topicPartitions(0)) |
| TestUtils.waitUntilTrue( |
| () => { |
| time.sleep(1000) |
| uncleanablePartitionsCountGauge.value() == 1 |
| }, |
| "There should be 1 uncleanable partitions", |
| 2000L) |
| |
| val uncleanablePartitions2 = cleaner.cleanerManager.uncleanablePartitions(uncleanableDirectory) |
| assertFalse(uncleanablePartitions2.contains(topicPartitions(0))) |
| assertTrue(uncleanablePartitions2.contains(topicPartitions(1))) |
| assertFalse(uncleanablePartitions2.contains(topicPartitions(2))) |
| } |
| |
| private def getGauge[T](filter: MetricName => Boolean): Gauge[T] = { |
| KafkaYammerMetrics.defaultRegistry.allMetrics.asScala |
| .filter { case (k, _) => filter(k) } |
| .headOption |
| .getOrElse { fail(s"Unable to find metric") } |
| .asInstanceOf[(Any, Gauge[Any])] |
| ._2 |
| .asInstanceOf[Gauge[T]] |
| } |
| |
| private def getGauge[T](metricName: String): Gauge[T] = { |
| getGauge(mName => mName.getName.endsWith(metricName) && mName.getScope == null) |
| } |
| |
| private def getGauge[T](metricName: String, metricScope: String): Gauge[T] = { |
| getGauge(k => k.getName.endsWith(metricName) && k.getScope.endsWith(metricScope)) |
| } |
| |
| @Test |
| def testMaxLogCompactionLag(): Unit = { |
| val msPerHour = 60 * 60 * 1000 |
| |
| val minCompactionLagMs = 1 * msPerHour |
| val maxCompactionLagMs = 6 * msPerHour |
| |
| val cleanerBackOffMs = 200L |
| val segmentSize = 512 |
| val topicPartitions = Array(new TopicPartition("log", 0), new TopicPartition("log", 1), new TopicPartition("log", 2)) |
| val minCleanableDirtyRatio = 1.0F |
| |
| cleaner = makeCleaner(partitions = topicPartitions, |
| backoffMs = cleanerBackOffMs, |
| minCompactionLagMs = minCompactionLagMs, |
| segmentSize = segmentSize, |
| maxCompactionLagMs= maxCompactionLagMs, |
| minCleanableDirtyRatio = minCleanableDirtyRatio) |
| val log = cleaner.logs.get(topicPartitions(0)) |
| |
| val T0 = time.milliseconds |
| writeKeyDups(numKeys = 100, numDups = 3, log, CompressionType.NONE, timestamp = T0, startValue = 0, step = 1) |
| |
| val startSizeBlock0 = log.size |
| |
| val activeSegAtT0 = log.activeSegment |
| |
| cleaner.startup() |
| |
| // advance to a time still less than maxCompactionLagMs from start |
| time.sleep(maxCompactionLagMs/2) |
| Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean |
| assertEquals(startSizeBlock0, log.size, "There should be no cleaning until the max compaction lag has passed") |
| |
| // advance to time a bit more than one maxCompactionLagMs from start |
| time.sleep(maxCompactionLagMs/2 + 1) |
| val T1 = time.milliseconds |
| |
| // write the second block of data: all zero keys |
| val appends1 = writeKeyDups(numKeys = 100, numDups = 1, log, CompressionType.NONE, timestamp = T1, startValue = 0, step = 0) |
| |
| // roll the active segment |
| log.roll() |
| val activeSegAtT1 = log.activeSegment |
| val firstBlockCleanableSegmentOffset = activeSegAtT0.baseOffset |
| |
| // the first block should get cleaned |
| cleaner.awaitCleaned(new TopicPartition("log", 0), firstBlockCleanableSegmentOffset) |
| |
| val read1 = readFromLog(log) |
| val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0)) |
| assertTrue(lastCleaned >= firstBlockCleanableSegmentOffset, |
| s"log cleaner should have processed at least to offset $firstBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned") |
| |
| //minCleanableDirtyRatio will prevent second block of data from compacting |
| assertNotEquals(appends1, read1, s"log should still contain non-zero keys") |
| |
| time.sleep(maxCompactionLagMs + 1) |
| // the second block should get cleaned. only zero keys left |
| cleaner.awaitCleaned(new TopicPartition("log", 0), activeSegAtT1.baseOffset) |
| |
| val read2 = readFromLog(log) |
| |
| assertEquals(appends1, read2, s"log should only contains zero keys now") |
| |
| val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints(new TopicPartition("log", 0)) |
| val secondBlockCleanableSegmentOffset = activeSegAtT1.baseOffset |
| assertTrue(lastCleaned2 >= secondBlockCleanableSegmentOffset, |
| s"log cleaner should have processed at least to offset $secondBlockCleanableSegmentOffset, but lastCleaned=$lastCleaned2") |
| } |
| |
| private def readFromLog(log: UnifiedLog): Iterable[(Int, Int)] = { |
| for (segment <- log.logSegments; record <- segment.log.records.asScala) yield { |
| val key = TestUtils.readString(record.key).toInt |
| val value = TestUtils.readString(record.value).toInt |
| key -> value |
| } |
| } |
| |
| private def writeKeyDups(numKeys: Int, numDups: Int, log: UnifiedLog, codec: CompressionType, timestamp: Long, |
| startValue: Int, step: Int): Seq[(Int, Int)] = { |
| var valCounter = startValue |
| for (_ <- 0 until numDups; key <- 0 until numKeys) yield { |
| val curValue = valCounter |
| log.appendAsLeader(TestUtils.singletonRecords(value = curValue.toString.getBytes, codec = codec, |
| key = key.toString.getBytes, timestamp = timestamp), leaderEpoch = 0) |
| // move LSO forward to increase compaction bound |
| log.updateHighWatermark(log.logEndOffset) |
| valCounter += step |
| (key, curValue) |
| } |
| } |
| |
| @Test |
| def testIsThreadFailed(): Unit = { |
| val metricName = "DeadThreadCount" |
| cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = 100000, backoffMs = 100) |
| cleaner.startup() |
| assertEquals(0, cleaner.deadThreadCount) |
| // we simulate the unexpected error with an interrupt |
| cleaner.cleaners.foreach(_.interrupt()) |
| // wait until interruption is propagated to all the threads |
| TestUtils.waitUntilTrue( |
| () => cleaner.cleaners.foldLeft(true)((result, thread) => { |
| thread.isThreadFailed && result |
| }), "Threads didn't terminate unexpectedly" |
| ) |
| assertEquals(cleaner.cleaners.size, getGauge[Int](metricName).value()) |
| assertEquals(cleaner.cleaners.size, cleaner.deadThreadCount) |
| } |
| } |