package kafka.log
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.log.remote.RemoteIndexCache
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import org.mockito.Mockito.{doAnswer, doNothing, mock, never, spy, times, verify}
import java.nio.file.Files
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future}
import java.util.{Collections, Properties}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
import{FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import scala.collection.{Map, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Try}
class LogManagerTest {
val time = new MockTime()
val maxRollInterval = 100
val maxLogAgeMs = 10 * 60 * 1000
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 4096: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, maxLogAgeMs: java.lang.Integer)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, Long.MaxValue.toString)
val logConfig = new LogConfig(logProps)
var logDir: File = _
var logManager: LogManager = _
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
def setUp(): Unit = {
logDir = TestUtils.tempDir()
logManager = createLogManager()
def tearDown(): Unit = {
if (logManager != null)
// Some tests assign a new LogManager
if (logManager != null)
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
def testCreateLog(): Unit = {
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
assertEquals(1, logManager.liveLogDirs.size)
val logFile = new File(logDir, name + "-0")
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
* Tests that all internal futures are completed before LogManager.shutdown() returns to the
* caller during error situations.
def testHandlingExceptionsDuringShutdown(): Unit = {
// We create two directories logDir1 and logDir2 to help effectively test error handling
// during LogManager.shutdown().
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
var logManagerForTest: Option[LogManager] = Option.empty
try {
logManagerForTest = Some(createLogManager(Seq(logDir1, logDir2)))
assertEquals(2, logManagerForTest.get.liveLogDirs.size)
val log1 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
val log2 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 1), topicId = None)
val logFile1 = new File(logDir1, name + "-0")
val logFile2 = new File(logDir2, name + "-1")
log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log1.appendAsLeader(TestUtils.singletonRecords("test1".getBytes()), leaderEpoch = 0)
log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
log2.appendAsLeader(TestUtils.singletonRecords("test2".getBytes()), leaderEpoch = 0)
// This should cause log1.close() to fail during LogManger shutdown sequence.
assertFalse(Files.exists(new File(logDir1, LogLoader.CleanShutdownFile).toPath))
assertTrue(Files.exists(new File(logDir2, LogLoader.CleanShutdownFile).toPath))
} finally {
logManagerForTest.foreach(manager => manager.liveLogDirs.foreach(Utils.delete))
* Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
* The LogManager is configured with one invalid log directory which should be marked as offline.
def testCreateLogWithInvalidLogDir(): Unit = {
// Configure the log dir with the Nul character as the path, which causes dir.getCanonicalPath() to throw an
// IOException. This simulates the scenario where the disk is not properly mounted (which is hard to achieve in
// a unit test)
val dirs = Seq(logDir, new File("\u0000"))
logManager = createLogManager(dirs)
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), isNew = true, topicId = None)
val logFile = new File(logDir, name + "-0")
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
def testCreateLogWithLogDirFallback(): Unit = {
// Configure a number of directories one level deeper in logDir,
// so they all get cleaned up in tearDown().
val dirs = (0 to 4)
// Create a new LogManager with the configured directories and an overridden createLogDirectory.
logManager = spy(createLogManager(dirs))
val brokenDirs = mutable.Set[File]()
doAnswer { invocation =>
// The first half of directories tried will fail, the rest goes through.
val logDir = invocation.getArgument[File](0)
if (brokenDirs.contains(logDir) || brokenDirs.size < dirs.length / 2) {
Failure(new Throwable("broken dir"))
} else {
}.when(logManager).createLogDirectory(any(), any())
// Request creating a new log.
// LogManager should try using all configured log directories until one succeeds.
logManager.getOrCreateLog(new TopicPartition(name, 0), isNew = true, topicId = None)
// Verify that half the directories were considered broken,
assertEquals(dirs.length / 2, brokenDirs.size)
// and that exactly one log file was created,
val containsLogFile: File => Boolean = dir => new File(dir, name + "-0").exists()
assertEquals(1, dirs.count(containsLogFile), "More than one log file created")
// and that it wasn't created in one of the broken directories.
* Test that get on a non-existent returns None and no log is created.
def testGetNonExistentLog(): Unit = {
val log = logManager.getLog(new TopicPartition(name, 0))
assertEquals(None, log, "No log should be found.")
val logFile = new File(logDir, name + "-0")
* Test time-based log cleanup. First append messages, then set the time into the future and run cleanup.
def testCleanupExpiredSegments(): Unit = {
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
var offset = 0L
for(_ <- 0 until 200) {
val set = TestUtils.singletonRecords("test".getBytes())
val info = log.appendAsLeader(set, leaderEpoch = 0)
offset = info.lastOffset
assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.")
time.sleep(maxLogAgeMs + 1)
assertEquals(1, log.numberOfSegments, "Now there should only be only one segment in the index.")
time.sleep(log.config.fileDeleteDelayMs + 1)
log.logSegments.foreach(s => {
// there should be a log file, two indexes, one producer snapshot, and the leader epoch checkpoint
assertEquals(log.numberOfSegments * 4 + 1, log.dir.list.length, "Files should have been deleted")
assertEquals(0, readLog(log, offset + 1).records.sizeInBytes, "Should get empty fetch off new log.")
assertThrows(classOf[OffsetOutOfRangeException], () => readLog(log, 0), () => "Should get exception from fetching earlier.")
// log should still be appendable
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
* Test size-based cleanup. Append messages, then run cleanup and check that segments are deleted.
def testCleanupSegmentsToMaintainSize(): Unit = {
val setSize = TestUtils.singletonRecords("test".getBytes()).sizeInBytes
val segmentBytes = 10 * setSize
val properties = new Properties()
properties.put(TopicConfig.SEGMENT_BYTES_CONFIG, segmentBytes.toString)
properties.put(TopicConfig.RETENTION_BYTES_CONFIG, (5L * 10L * setSize + 10L).toString)
val configRepository = MockConfigRepository.forTopic(name, properties)
logManager = createLogManager(configRepository = configRepository)
// create a log
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
var offset = 0L
// add a bunch of messages that should be larger than the retentionSize
val numMessages = 200
for (_ <- 0 until numMessages) {
val set = TestUtils.singletonRecords("test".getBytes())
val info = log.appendAsLeader(set, leaderEpoch = 0)
offset = info.firstOffset.get.messageOffset
assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.")
// this cleanup shouldn't find any expired segments but should delete some to reduce size
assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 segments")
time.sleep(log.config.fileDeleteDelayMs + 1)
// there should be a log file, two indexes (the txn index is created lazily),
// and a producer snapshot file per segment, and the leader epoch checkpoint.
assertEquals(log.numberOfSegments * 4 + 1, log.dir.list.length, "Files should have been deleted")
assertEquals(0, readLog(log, offset + 1).records.sizeInBytes, "Should get empty fetch off new log.")
assertThrows(classOf[OffsetOutOfRangeException], () => readLog(log, 0))
// log should still be appendable
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
* Ensures that LogManager doesn't run on logs with cleanup.policy=compact,delete
* LogCleaner.CleanerThread handles all logs where compaction is enabled.
def testDoesntCleanLogsWithCompactDeletePolicy(): Unit = {
testDoesntCleanLogs(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
* Ensures that LogManager doesn't run on logs with cleanup.policy=compact
* LogCleaner.CleanerThread handles all logs where compaction is enabled.
def testDoesntCleanLogsWithCompactPolicy(): Unit = {
private def testDoesntCleanLogs(policy: String): Unit = {
val configRepository = MockConfigRepository.forTopic(name, TopicConfig.CLEANUP_POLICY_CONFIG, policy)
logManager = createLogManager(configRepository = configRepository)
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
var offset = 0L
for (_ <- 0 until 200) {
val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes())
val info = log.appendAsLeader(set, leaderEpoch = 0)
offset = info.lastOffset
val numSegments = log.numberOfSegments
assertTrue(log.numberOfSegments > 1, "There should be more than one segment now.")
time.sleep(maxLogAgeMs + 1)
assertEquals(numSegments, log.numberOfSegments, "number of segments shouldn't have changed")
* Test that flush is invoked by the background scheduler thread.
def testTimeBasedFlush(): Unit = {
val configRepository = MockConfigRepository.forTopic(name, TopicConfig.FLUSH_MS_CONFIG, "1000")
logManager = createLogManager(configRepository = configRepository)
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
val lastFlush = log.lastFlushTime
for (_ <- 0 until 200) {
val set = TestUtils.singletonRecords("test".getBytes())
log.appendAsLeader(set, leaderEpoch = 0)
assertTrue(lastFlush != log.lastFlushTime, "Time based flush should have been triggered")
* Test that new logs that are created are assigned to the least loaded log directory
def testLeastLoadedAssignment(): Unit = {
// create a log manager with multiple data directories
val dirs = Seq(TestUtils.tempDir(),
logManager = createLogManager(dirs)
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
logManager.getOrCreateLog(new TopicPartition("test", partition), topicId = None)
assertEquals(partition + 1, logManager.allLogs.size, "We should have created the right number of logs")
val counts = logManager.allLogs.groupBy(_.dir.getParent)
assertTrue(counts.max <= counts.min + 1, "Load should balance evenly")
* Tests that the log manager skips the remote-log-index-cache directory when loading the logs from disk
def testLoadLogsSkipRemoteIndexCache(): Unit = {
val logDir = TestUtils.tempDir()
val remoteIndexCache = new File(logDir, RemoteIndexCache.DirName)
logManager = createLogManager(Seq(logDir))
logManager.loadLogs(logConfig, Map.empty)
* Test that it is not possible to open two log managers using the same data directory
def testTwoLogManagersUsingSameDirFails(): Unit = {
assertThrows(classOf[KafkaException], () => createLogManager())
* Test that recovery points are correctly written out to disk
def testCheckpointRecoveryPoints(): Unit = {
verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1), new TopicPartition("test-b", 1)), logManager, logDir)
* Test that recovery points directory checking works with trailing slash
def testRecoveryDirectoryMappingWithTrailingSlash(): Unit = {
logManager = TestUtils.createLogManager(logDirs = Seq(new File(TestUtils.tempDir().getAbsolutePath + File.separator)))
verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head)
* Test that recovery points directory checking works with relative directory
def testRecoveryDirectoryMappingWithRelativeDirectory(): Unit = {
logManager = createLogManager(Seq(new File("data", logDir.getName).getAbsoluteFile))
verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head)
private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], logManager: LogManager, logDir: File): Unit = {
val logs =, topicId = None))
logs.foreach { log =>
for (_ <- 0 until 50)
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read() { case (tp, log) =>
assertEquals(checkpoints(tp), log.recoveryPoint, "Recovery point should equal checkpoint")
private def createLogManager(logDirs: Seq[File] = Seq(this.logDir),
configRepository: ConfigRepository = new MockConfigRepository,
recoveryThreadsPerDataDir: Int = 1): LogManager = {
defaultConfig = logConfig,
configRepository = configRepository,
logDirs = logDirs,
time = this.time,
recoveryThreadsPerDataDir = recoveryThreadsPerDataDir)
def testFileReferencesAfterAsyncDelete(): Unit = {
val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None)
val activeSegment = log.activeSegment
val logName = activeSegment.log.file.getName
val indexName = activeSegment.offsetIndex.file.getName
val timeIndexName = activeSegment.timeIndex.file.getName
val txnIndexName = activeSegment.txnIndex.file.getName
val indexFilesOnDiskBeforeDelete = activeSegment.log.file.getParentFile.listFiles.filter(_.getName.endsWith("index"))
val removedLog = logManager.asyncDelete(new TopicPartition(name, 0)).get
val removedSegment = removedLog.activeSegment
val indexFilesAfterDelete = Seq(removedSegment.lazyOffsetIndex.file, removedSegment.lazyTimeIndex.file,
assertEquals(new File(removedLog.dir, logName), removedSegment.log.file)
assertEquals(new File(removedLog.dir, indexName), removedSegment.lazyOffsetIndex.file)
assertEquals(new File(removedLog.dir, timeIndexName), removedSegment.lazyTimeIndex.file)
assertEquals(new File(removedLog.dir, txnIndexName), removedSegment.txnIndex.file)
// Try to detect the case where a new index type was added and we forgot to update the pointer
// This will only catch cases where the index file is created eagerly instead of lazily
indexFilesOnDiskBeforeDelete.foreach { fileBeforeDelete =>
val fileInIndex = indexFilesAfterDelete.find(_.getName == fileBeforeDelete.getName)
s"Could not find index file ${fileBeforeDelete.getName} in indexFilesAfterDelete")
assertNotEquals("File reference was not updated in index", fileBeforeDelete.getAbsolutePath,
assertTrue(logManager.hasLogsToBeDeleted, "Logs deleted too early")
time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs)
assertFalse(logManager.hasLogsToBeDeleted, "Logs not deleted")
def testCreateAndDeleteOverlyLongTopic(): Unit = {
val invalidTopicName = String.join("", Collections.nCopies(253, "x"))
logManager.getOrCreateLog(new TopicPartition(invalidTopicName, 0), topicId = None)
logManager.asyncDelete(new TopicPartition(invalidTopicName, 0))
def testCheckpointForOnlyAffectedLogs(): Unit = {
val tps = Seq(
new TopicPartition("test-a", 0),
new TopicPartition("test-a", 1),
new TopicPartition("test-a", 2),
new TopicPartition("test-b", 0),
new TopicPartition("test-b", 1))
val allLogs =, topicId = None))
allLogs.foreach { log =>
for (_ <- 0 until 50)
log.appendAsLeader(TestUtils.singletonRecords("test".getBytes), leaderEpoch = 0)
val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read() { case (tp, log) =>
assertEquals(checkpoints(tp), log.recoveryPoint,
"Recovery point should equal checkpoint")
private def readLog(log: UnifiedLog, offset: Long, maxLength: Int = 1024): FetchDataInfo = {, maxLength, isolation = FetchIsolation.LOG_END, minOneMessage = true)
* Test when a configuration of a topic is updated while its log is getting initialized,
* the config is refreshed when log initialization is finished.
def testTopicConfigChangeUpdatesLogConfig(): Unit = {
val spyConfigRepository = spy(new MockConfigRepository)
logManager = createLogManager(configRepository = spyConfigRepository)
val spyLogManager = spy(logManager)
val mockLog = mock(classOf[UnifiedLog])
val testTopicOne = "test-topic-one"
val testTopicTwo = "test-topic-two"
val testTopicOnePartition = new TopicPartition(testTopicOne, 1)
val testTopicTwoPartition = new TopicPartition(testTopicTwo, 1)
spyLogManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog))
spyLogManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog))
// testTopicOne configs loaded again due to the update
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(testTopicOnePartition), ArgumentMatchers.any())
verify(spyConfigRepository, times(1)).topicConfig(testTopicOne)
// testTopicTwo configs not loaded again since there was no update
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(testTopicTwoPartition), ArgumentMatchers.any())
verify(spyConfigRepository, never).topicConfig(testTopicTwo)
* Test if an error occurs when creating log, log manager removes corresponding
* topic partition from the list of initializing partitions and no configs are retrieved.
def testConfigChangeGetsCleanedUp(): Unit = {
val spyConfigRepository = spy(new MockConfigRepository)
logManager = createLogManager(configRepository = spyConfigRepository)
val spyLogManager = spy(logManager)
val testTopicPartition = new TopicPartition("test-topic", 1)
spyLogManager.finishedInitializingLog(testTopicPartition, None)
verify(spyConfigRepository, never).topicConfig(testTopicPartition.topic)
* Test when a broker configuration change happens all logs in process of initialization
* pick up latest config when finished with initialization.
def testBrokerConfigChangeDeliveredToAllLogs(): Unit = {
val spyConfigRepository = spy(new MockConfigRepository)
logManager = createLogManager(configRepository = spyConfigRepository)
val spyLogManager = spy(logManager)
val mockLog = mock(classOf[UnifiedLog])
val testTopicOne = "test-topic-one"
val testTopicTwo = "test-topic-two"
val testTopicOnePartition = new TopicPartition(testTopicOne, 1)
val testTopicTwoPartition = new TopicPartition(testTopicTwo, 1)
spyLogManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog))
spyLogManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog))
verify(spyConfigRepository, times(1)).topicConfig(testTopicOne)
verify(spyConfigRepository, times(1)).topicConfig(testTopicTwo)
* Test when compact is removed that cleaning of the partitions is aborted.
def testTopicConfigChangeStopCleaningIfCompactIsRemoved(): Unit = {
logManager = createLogManager(configRepository = new MockConfigRepository)
val spyLogManager = spy(logManager)
val topic = "topic"
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
val oldProperties = new Properties()
oldProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val oldLogConfig = LogConfig.fromProps(logConfig.originals, oldProperties)
val log0 = spyLogManager.getOrCreateLog(tp0, topicId = None)
val log1 = spyLogManager.getOrCreateLog(tp1, topicId = None)
assertEquals(Set(log0, log1), spyLogManager.logsByTopic(topic).toSet)
val newProperties = new Properties()
newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
spyLogManager.updateTopicConfig(topic, newProperties)
verify(spyLogManager, times(1)).topicConfigUpdated(topic)
verify(spyLogManager, times(1)).abortCleaning(tp0)
verify(spyLogManager, times(1)).abortCleaning(tp1)
* Test even if no log is getting initialized, if config change events are delivered
* things continue to work correctly. This test should not throw.
* This makes sure that events can be delivered even when no log is getting initialized.
def testConfigChangesWithNoLogGettingInitialized(): Unit = {
private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = {
def createRecord = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
val tpFile = new File(parentLogDir, s"$name-$partitionId")
val segmentBytes = 1024
val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0,
5 * 60 * 1000, new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false), kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs)
assertTrue(expectedSegmentsPerLog > 0)
// calculate numMessages to append to logs. It'll create "expectedSegmentsPerLog" log segments with segment.bytes=1024
val numMessages = Math.floor(segmentBytes * expectedSegmentsPerLog / createRecord.sizeInBytes).asInstanceOf[Int]
try {
for (_ <- 0 until numMessages) {
log.appendAsLeader(createRecord, leaderEpoch = 0)
assertEquals(expectedSegmentsPerLog, log.numberOfSegments)
} finally {
private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = {
val logManagerClassName = classOf[LogManager].getSimpleName
// get all `remainingLogsToRecover` metrics
val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
.filter { case (metric, _) => metric.getType == s"$logManagerClassName" && metric.getName == "remainingLogsToRecover" }
.map { case (_, gauge) => gauge }
assertEquals(expectedParams.size, logMetrics.size)
val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String])
val expectedCallTimes = expectedParams.values.sum
verify(spyLogManager, times(expectedCallTimes)).decNumRemainingLogs(any[ConcurrentMap[String, Int]], capturedPath.capture());
val paths = capturedPath.getAllValues
expectedParams.foreach {
case (path, totalLogs) =>
// make sure each path is called "totalLogs" times, which means it is decremented to 0 in the end
assertEquals(totalLogs, Collections.frequency(paths, path))
// expected the end value is 0
logMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager,
logDirs: Seq[File],
recoveryThreadsPerDataDir: Int,
mockMap: ConcurrentHashMap[String, Int],
expectedParams: Map[String, Int]): Unit = {
val logManagerClassName = classOf[LogManager].getSimpleName
// get all `remainingSegmentsToRecover` metrics
val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
.filter { case (metric, _) => metric.getType == s"$logManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
.map { case (_, gauge) => gauge }
// expected each log dir has 1 metrics for each thread
assertEquals(recoveryThreadsPerDataDir * logDirs.size, logSegmentMetrics.size)
val capturedThreadName: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String])
val capturedNumRemainingSegments: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int])
// Since we'll update numRemainingSegments from totalSegments to 0 for each thread, so we need to add 1 here
val expectedCallTimes = num => num + 1 ).sum
verify(mockMap, times(expectedCallTimes)).put(capturedThreadName.capture(), capturedNumRemainingSegments.capture());
// expected the end value is 0
logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) }
val threadNames = capturedThreadName.getAllValues
val numRemainingSegments = capturedNumRemainingSegments.getAllValues
expectedParams.foreach {
case (threadName, totalSegments) =>
// make sure we update the numRemainingSegments from totalSegments to 0 in order for each thread
var expectedCurRemainingSegments = totalSegments + 1
for (i <- 0 until threadNames.size) {
if (threadNames.get(i).contains(threadName)) {
expectedCurRemainingSegments -= 1
assertEquals(expectedCurRemainingSegments, numRemainingSegments.get(i))
assertEquals(0, expectedCurRemainingSegments)
private def verifyLogRecoverMetricsRemoved(spyLogManager: LogManager): Unit = {
val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
// get all `remainingLogsToRecover` metrics
def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
.filter { metric => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
// get all `remainingSegmentsToRecover` metrics
val logSegmentMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
.filter { metric => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
def testLogRecoveryMetrics(): Unit = {
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
val logDirs = Seq(logDir1, logDir2)
val recoveryThreadsPerDataDir = 2
// create logManager with expected recovery thread number
logManager = createLogManager(logDirs, recoveryThreadsPerDataDir = recoveryThreadsPerDataDir)
val spyLogManager = spy(logManager)
assertEquals(2, spyLogManager.liveLogDirs.size)
val mockTime = new MockTime()
val mockMap = mock(classOf[ConcurrentHashMap[String, Int]])
val mockBrokerTopicStats = mock(classOf[BrokerTopicStats])
val expectedSegmentsPerLog = 2
// create log segments for log recovery in each log dir
appendRecordsToLog(mockTime, logDir1, 0, mockBrokerTopicStats, expectedSegmentsPerLog)
appendRecordsToLog(mockTime, logDir2, 1, mockBrokerTopicStats, expectedSegmentsPerLog)
// intercept loadLog method to pass expected parameter to do log recovery
doAnswer { invocation =>
val dir: File = invocation.getArgument(0)
val topicConfigOverrides: mutable.Map[String, LogConfig] = invocation.getArgument(5)
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val config = topicConfigOverrides.getOrElse(topicPartition.topic, logConfig)
dir = dir,
config = config,
logStartOffset = 0,
recoveryPoint = 0,
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = new ProducerStateManagerConfig(5 * 60 * 1000, false),
producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
scheduler = mockTime.scheduler,
time = mockTime,
brokerTopicStats = mockBrokerTopicStats,
logDirFailureChannel = mock(classOf[LogDirFailureChannel]),
// not clean shutdown
lastShutdownClean = false,
topicId = None,
keepPartitionMetadataFile = false,
// pass mock map for verification later
numRemainingSegments = mockMap)
}.when(spyLogManager).loadLog(any[File], any[Boolean], any[Map[TopicPartition, Long]], any[Map[TopicPartition, Long]],
any[LogConfig], any[Map[String, LogConfig]], any[ConcurrentMap[String, Int]])
// do nothing for removeLogRecoveryMetrics for metrics verification
// start the logManager to do log recovery
// make sure log recovery metrics are added and removed
verify(spyLogManager, times(1)).addLogRecoveryMetrics(any[ConcurrentMap[String, Int]], any[ConcurrentMap[String, Int]])
verify(spyLogManager, times(1)).removeLogRecoveryMetrics()
// expected 1 log in each log dir since we created 2 partitions with 2 log dirs
val expectedRemainingLogsParams = Map[String, Int](logDir1.getAbsolutePath -> 1, logDir2.getAbsolutePath -> 1)
verifyRemainingLogsToRecoverMetric(spyLogManager, expectedRemainingLogsParams)
val expectedRemainingSegmentsParams = Map[String, Int](
logDir1.getAbsolutePath -> expectedSegmentsPerLog, logDir2.getAbsolutePath -> expectedSegmentsPerLog)
verifyRemainingSegmentsToRecoverMetric(spyLogManager, logDirs, recoveryThreadsPerDataDir, mockMap, expectedRemainingSegmentsParams)
def testLogRecoveryMetricsShouldBeRemovedAfterLogRecovered(): Unit = {
val logDir1 = TestUtils.tempDir()
val logDir2 = TestUtils.tempDir()
val logDirs = Seq(logDir1, logDir2)
val recoveryThreadsPerDataDir = 2
// create logManager with expected recovery thread number
logManager = createLogManager(logDirs, recoveryThreadsPerDataDir = recoveryThreadsPerDataDir)
val spyLogManager = spy(logManager)
assertEquals(2, spyLogManager.liveLogDirs.size)
// start the logManager to do log recovery
// make sure log recovery metrics are added and removed once
verify(spyLogManager, times(1)).addLogRecoveryMetrics(any[ConcurrentMap[String, Int]], any[ConcurrentMap[String, Int]])
verify(spyLogManager, times(1)).removeLogRecoveryMetrics()
def testMetricsExistWhenLogIsRecreatedBeforeDeletion(): Unit = {
val topicName = "metric-test"
def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.
filter(metric => metric.getType == "Log" && metric.getScope.contains(topicName))
val tp = new TopicPartition(topicName, 0)
val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
def verifyMetrics(): Unit = {
assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size)
logMetrics.foreach { metric =>
// Create the Log and assert that the metrics are present
logManager.getOrCreateLog(tp, topicId = None)
// Trigger the deletion and assert that the metrics have been removed
val removedLog = logManager.asyncDelete(tp).get
// Recreate the Log and assert that the metrics are present
logManager.getOrCreateLog(tp, topicId = None)
// Advance time past the file deletion delay and assert that the removed log has been deleted but the metrics
// are still present
time.sleep(logConfig.fileDeleteDelayMs + 1)
def testMetricsAreRemovedWhenMovingCurrentToFutureLog(): Unit = {
val dir1 = TestUtils.tempDir()
val dir2 = TestUtils.tempDir()
logManager = createLogManager(Seq(dir1, dir2))
val topicName = "future-log"
def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.
filter(metric => metric.getType == "Log" && metric.getScope.contains(topicName))
val tp = new TopicPartition(topicName, 0)
val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
def verifyMetrics(logCount: Int): Unit = {
assertEquals(LogMetricNames.allMetricNames.size * logCount, logMetrics.size)
logMetrics.foreach { metric =>
// Create the current and future logs and verify that metrics are present for both current and future logs
logManager.maybeUpdatePreferredLogDir(tp, dir1.getAbsolutePath)
logManager.getOrCreateLog(tp, topicId = None)
logManager.maybeUpdatePreferredLogDir(tp, dir2.getAbsolutePath)
logManager.getOrCreateLog(tp, isFuture = true, topicId = None)
// Replace the current log with the future one and verify that only one set of metrics are present
// the future log is gone, so we have to make sure the metrics gets gone also.
assertEquals(0, logMetrics.count(m => m.getMBeanName.contains("is-future")))
// Trigger the deletion of the former current directory and verify that one set of metrics is still present
time.sleep(logConfig.fileDeleteDelayMs + 1)
def testWaitForAllToComplete(): Unit = {
var invokedCount = 0
val success: Future[Boolean] = Mockito.mock(classOf[Future[Boolean]])
Mockito.when(success.get()).thenAnswer { _ =>
invokedCount += 1
val failure: Future[Boolean] = Mockito.mock(classOf[Future[Boolean]])
Mockito.when(failure.get()).thenAnswer{ _ =>
invokedCount += 1
throw new RuntimeException
var failureCount = 0
// all futures should be evaluated
assertFalse(LogManager.waitForAllToComplete(Seq(success, failure), _ => failureCount += 1))
assertEquals(2, invokedCount)
assertEquals(1, failureCount)
assertFalse(LogManager.waitForAllToComplete(Seq(failure, success), _ => failureCount += 1))
assertEquals(4, invokedCount)
assertEquals(2, failureCount)
assertTrue(LogManager.waitForAllToComplete(Seq(success, success), _ => failureCount += 1))
assertEquals(6, invokedCount)
assertEquals(2, failureCount)
assertFalse(LogManager.waitForAllToComplete(Seq(failure, failure), _ => failureCount += 1))
assertEquals(8, invokedCount)
assertEquals(4, failureCount)