blob: cbe417c335d11510cdb21f437416e158e05f319f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import java.util.{Optional, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.log._
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.metadata.MockConfigRepository
import kafka.utils._
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.Mockito.{mock, when}
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
/**
* Verifies that slow appends to log don't block request threads processing replica fetch requests.
*
* Test simulates:
* 1) Produce request handling by performing append to log as leader.
* 2) Replica fetch request handling by processing update of ISR and HW based on log read result.
* 3) Some tests also simulate a scheduler thread that checks or updates ISRs when replica falls out of ISR
*/
class PartitionLockTest extends Logging {
val numReplicaFetchers = 2
val numProducers = 3
val numRecordsPerProducer = 5
val mockTime = new MockTime() // Used for check to shrink ISR
val tmpDir = TestUtils.tempDir()
val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val executorService = Executors.newFixedThreadPool(numReplicaFetchers + numProducers + 1)
val appendSemaphore = new Semaphore(0)
val shrinkIsrSemaphore = new Semaphore(0)
var logManager: LogManager = _
var partition: Partition = _
private val topicPartition = new TopicPartition("test-topic", 0)
@BeforeEach
def setUp(): Unit = {
val logConfig = new LogConfig(new Properties)
val configRepository = MockConfigRepository.forTopic(topicPartition.topic, createLogProperties(Map.empty))
logManager = TestUtils.createLogManager(Seq(logDir), logConfig, configRepository,
new CleanerConfig(false), mockTime)
partition = setupPartitionWithMocks(logManager)
}
@AfterEach
def tearDown(): Unit = {
executorService.shutdownNow()
logManager.liveLogDirs.foreach(Utils.delete)
Utils.delete(tmpDir)
}
/**
* Verifies that delays in appending to leader while processing produce requests has no impact on timing
* of update of log read result when processing replica fetch request if no ISR update is required.
*/
@Test
def testNoLockContentionWithoutIsrUpdate(): Unit = {
concurrentProduceFetchWithReadLockOnly()
}
/**
* Verifies that delays in appending to leader while processing produce requests has no impact on timing
* of update of log read result when processing replica fetch request even if a scheduler thread is checking
* for ISR shrink conditions if no ISR update is required.
*/
@Test
def testAppendReplicaFetchWithSchedulerCheckForShrinkIsr(): Unit = {
val active = new AtomicBoolean(true)
val future = scheduleShrinkIsr(active, mockTimeSleepMs = 0)
concurrentProduceFetchWithReadLockOnly()
active.set(false)
future.get(15, TimeUnit.SECONDS)
}
/**
* Verifies concurrent produce and replica fetch log read result update with ISR updates. This
* can result in delays in processing produce and replica fetch requets since write lock is obtained,
* but it should complete without any failures.
*/
@Test
def testAppendReplicaFetchWithUpdateIsr(): Unit = {
val active = new AtomicBoolean(true)
val future = scheduleShrinkIsr(active, mockTimeSleepMs = 10000)
TestUtils.waitUntilTrue(() => shrinkIsrSemaphore.hasQueuedThreads, "shrinkIsr not invoked")
concurrentProduceFetchWithWriteLock()
active.set(false)
future.get(15, TimeUnit.SECONDS)
}
/**
* Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access
* to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state
* where replicas present both in the old and new assignment are missing
*/
@Test
def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
val active = new AtomicBoolean(true)
val replicaToCheck = 3
val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
def partitionState(replicas: java.util.List[Integer]) = new LeaderAndIsrPartitionState()
.setControllerEpoch(1)
.setLeader(replicas.get(0))
.setLeaderEpoch(1)
.setIsr(replicas)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true)
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
// Update replica set synchronously first to avoid race conditions
partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints, None)
assertTrue(partition.getReplica(replicaToCheck).isDefined, s"Expected replica $replicaToCheck to be defined")
val future = executorService.submit((() => {
var i = 0
// Flip assignment between two replica sets
while (active.get) {
val replicas = if (i % 2 == 0) {
firstReplicaSet
} else {
secondReplicaSet
}
partition.makeLeader(partitionState(replicas), offsetCheckpoints, None)
i += 1
Thread.sleep(1) // just to avoid tight loop
}
}): Runnable)
val deadline = 1.seconds.fromNow
while (deadline.hasTimeLeft()) {
assertTrue(partition.getReplica(replicaToCheck).isDefined, s"Expected replica $replicaToCheck to be defined")
}
active.set(false)
future.get(5, TimeUnit.SECONDS)
assertTrue(partition.getReplica(replicaToCheck).isDefined, s"Expected replica $replicaToCheck to be defined")
}
/**
* Perform concurrent appends and replica fetch requests that don't require write lock to
* update follower state. Release sufficient append permits to complete all except one append.
* Verify that follower state updates complete even though an append holding read lock is in progress.
* Then release the permit for the final append and verify that all appends and follower updates complete.
*/
private def concurrentProduceFetchWithReadLockOnly(): Unit = {
val leaderEpoch = partition.getLeaderEpoch
val appendFutures = scheduleAppends()
val stateUpdateFutures = scheduleFollowerFetches(leaderEpoch, numRecords = numProducers * numRecordsPerProducer - 1)
appendSemaphore.release(numProducers * numRecordsPerProducer - 1)
stateUpdateFutures.foreach(_.get(15, TimeUnit.SECONDS))
appendSemaphore.release(1)
scheduleFollowerFetches(leaderEpoch, numRecords = 1).foreach(_.get(15, TimeUnit.SECONDS)) // just to make sure follower state update still works
appendFutures.foreach(_.get(15, TimeUnit.SECONDS))
}
/**
* Perform concurrent appends and replica fetch requests that may require write lock to update
* follower state. Threads waiting for write lock to update follower state while append thread is
* holding read lock will prevent other threads acquiring the read or write lock. So release sufficient
* permits for all appends to complete before verifying state updates.
*/
private def concurrentProduceFetchWithWriteLock(): Unit = {
val leaderEpoch = partition.getLeaderEpoch
val appendFutures = scheduleAppends()
val stateUpdateFutures = scheduleFollowerFetches(leaderEpoch, numRecords = numProducers * numRecordsPerProducer)
assertFalse(stateUpdateFutures.exists(_.isDone))
appendSemaphore.release(numProducers * numRecordsPerProducer)
assertFalse(appendFutures.exists(_.isDone))
shrinkIsrSemaphore.release()
stateUpdateFutures.foreach(_.get(15, TimeUnit.SECONDS))
appendFutures.foreach(_.get(15, TimeUnit.SECONDS))
}
private def scheduleAppends(): Seq[Future[_]] = {
(0 until numProducers).map { _ =>
executorService.submit((() => {
try {
append(partition, numRecordsPerProducer)
} catch {
case e: Throwable =>
error("Exception during append", e)
throw e
}
}): Runnable)
}
}
private def scheduleFollowerFetches(leaderEpoch: Int, numRecords: Int): Seq[Future[_]] = {
(1 to numReplicaFetchers).map { index =>
executorService.submit((() => {
try {
fetchFollower(partition, index, leaderEpoch, numRecords)
} catch {
case e: Throwable =>
error("Exception during updateFollowerFetchState", e)
throw e
}
}): Runnable)
}
}
private def scheduleShrinkIsr(activeFlag: AtomicBoolean, mockTimeSleepMs: Long): Future[_] = {
executorService.submit((() => {
while (activeFlag.get) {
if (mockTimeSleepMs > 0)
mockTime.sleep(mockTimeSleepMs)
partition.maybeShrinkIsr()
Thread.sleep(1) // just to avoid tight loop
}
}): Runnable)
}
private def setupPartitionWithMocks(logManager: LogManager): Partition = {
val leaderEpoch = 1
val brokerId = 0
val isrChangeListener: AlterPartitionListener = mock(classOf[AlterPartitionListener])
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
val alterIsrManager: AlterPartitionManager = mock(classOf[AlterPartitionManager])
logManager.startup(Set.empty)
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = kafka.server.Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => 1L,
mockTime,
isrChangeListener,
delayedOperations,
metadataCache,
logManager,
alterIsrManager) {
override def prepareIsrShrink(
currentState: CommittedPartitionState,
outOfSyncReplicaIds: Set[Int]
): PendingShrinkIsr = {
shrinkIsrSemaphore.acquire()
try {
super.prepareIsrShrink(currentState, outOfSyncReplicaIds)
} finally {
shrinkIsrSemaphore.release()
}
}
override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeout = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,
maxTransactionTimeout,
producerStateManagerConfig,
mockTime
)
val offsets = new LogLoader(
log.dir,
log.topicPartition,
log.config,
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
segments,
0L,
0L,
leaderEpochCache,
producerStateManager
).load()
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
logDirFailureChannel)
new SlowLog(log, offsets.logStartOffset, localLog, leaderEpochCache, producerStateManager, appendSemaphore)
}
}
val topicIdPartition = new TopicIdPartition(partition.topicId.getOrElse(Uuid.ZERO_UUID), topicPartition)
when(offsetCheckpoints.fetch(
ArgumentMatchers.anyString,
ArgumentMatchers.eq(topicPartition)
)).thenReturn(None)
when(alterIsrManager.submit(
ArgumentMatchers.eq(topicIdPartition),
ArgumentMatchers.any[LeaderAndIsr],
ArgumentMatchers.anyInt()
)).thenReturn(new CompletableFuture[LeaderAndIsr]())
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val controllerEpoch = 0
val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
val isr = replicas
assertTrue(partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed")
partition
}
private def createLogProperties(overrides: Map[String, String]): Properties = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer)
overrides.foreach { case (k, v) => logProps.put(k, v) }
logProps
}
private def append(
partition: Partition,
numRecords: Int
): Unit = {
val requestLocal = RequestLocal.withThreadConfinedCaching
(0 until numRecords).foreach { _ =>
val batch = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)))
partition.appendRecordsToLeader(batch, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)
}
}
private def fetchFollower(
partition: Partition,
followerId: Int,
leaderEpoch: Int,
numRecords: Int
): Unit = {
val logStartOffset = 0L
var fetchOffset = 0L
var lastFetchedEpoch = Optional.empty[Integer]
val maxBytes = 1
while (fetchOffset < numRecords) {
val fetchParams = new FetchParams(
ApiKeys.FETCH.latestVersion,
followerId,
1,
0L,
1,
maxBytes,
FetchIsolation.LOG_END,
Optional.empty()
)
val fetchPartitionData = new FetchRequest.PartitionData(
Uuid.ZERO_UUID,
fetchOffset,
logStartOffset,
maxBytes,
Optional.of(Int.box(leaderEpoch)),
lastFetchedEpoch
)
val logReadInfo = partition.fetchRecords(
fetchParams,
fetchPartitionData,
mockTime.milliseconds(),
maxBytes,
minOneMessage = true,
updateFetchState = true
)
assertTrue(!logReadInfo.divergingEpoch.isPresent)
val batches = logReadInfo.fetchedData.records.batches.asScala
if (batches.nonEmpty) {
assertEquals(1, batches.size)
val batch = batches.head
lastFetchedEpoch = Optional.of(Int.box(batch.partitionLeaderEpoch))
fetchOffset = batch.lastOffset + 1
}
}
}
private class SlowLog(
log: UnifiedLog,
logStartOffset: Long,
localLog: LocalLog,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
appendSemaphore: Semaphore
) extends UnifiedLog(
logStartOffset,
localLog,
new BrokerTopicStats,
log.producerIdExpirationCheckIntervalMs,
leaderEpochCache,
producerStateManager,
_topicId = None,
keepPartitionMetadataFile = true) {
override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal): LogAppendInfo = {
val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion, requestLocal)
appendSemaphore.acquire()
appendInfo
}
}
}