blob: afb4a52d7f1e1ef18de29d1b787793313868c5bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import java.net.InetAddress
import com.yammer.metrics.core.Metric
import kafka.common.UnexpectedAppendOffsetException
import kafka.log._
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AlterPartitionResponse, FetchRequest, ListOffsetsRequest, RequestHeader}
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogReadInfo, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
object PartitionTest {
class MockPartitionListener extends PartitionListener {
private var highWatermark: Long = -1L
private var failed: Boolean = false
private var deleted: Boolean = false
override def onHighWatermarkUpdated(partition: TopicPartition, offset: Long): Unit = {
highWatermark = offset
}
override def onFailed(partition: TopicPartition): Unit = {
failed = true
}
override def onDeleted(partition: TopicPartition): Unit = {
deleted = true
}
private def clear(): Unit = {
highWatermark = -1L
failed = false
deleted = false
}
/**
* Verifies the callbacks that have been triggered since the last
* verification. Values different than `-1` are the ones that have
* been updated.
*/
def verify(
expectedHighWatermark: Long = -1L,
expectedFailed: Boolean = false,
expectedDeleted: Boolean = false
): Unit = {
assertEquals(expectedHighWatermark, highWatermark,
"Unexpected high watermark")
assertEquals(expectedFailed, failed,
"Unexpected failed")
assertEquals(expectedDeleted, deleted,
"Unexpected deleted")
clear()
}
}
def followerFetchParams(
replicaId: Int,
replicaEpoch: Long = 1L,
maxWaitMs: Long = 0L,
minBytes: Int = 1,
maxBytes: Int = Int.MaxValue
): FetchParams = {
new FetchParams(
ApiKeys.FETCH.latestVersion,
replicaId,
replicaEpoch,
maxWaitMs,
minBytes,
maxBytes,
FetchIsolation.LOG_END,
Optional.empty()
)
}
def consumerFetchParams(
maxWaitMs: Long = 0L,
minBytes: Int = 1,
maxBytes: Int = Int.MaxValue,
clientMetadata: Option[ClientMetadata] = None,
isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK
): FetchParams = {
new FetchParams(
ApiKeys.FETCH.latestVersion,
FetchRequest.CONSUMER_REPLICA_ID,
-1,
maxWaitMs,
minBytes,
maxBytes,
isolation,
clientMetadata.asJava
)
}
}
class PartitionTest extends AbstractPartitionTest {
import PartitionTest._
@Test
def testLastFetchedOffsetValidation(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
def append(leaderEpoch: Int, count: Int): Unit = {
val recordArray = (1 to count).map { i =>
new SimpleRecord(s"$i".getBytes)
}
val records = MemoryRecords.withRecords(0L, CompressionType.NONE, leaderEpoch,
recordArray: _*)
log.appendAsLeader(records, leaderEpoch = leaderEpoch)
}
append(leaderEpoch = 0, count = 2) // 0
append(leaderEpoch = 3, count = 3) // 2
append(leaderEpoch = 3, count = 3) // 5
append(leaderEpoch = 4, count = 5) // 8
append(leaderEpoch = 7, count = 1) // 13
append(leaderEpoch = 9, count = 3) // 14
assertEquals(17L, log.logEndOffset)
val leaderEpoch = 10
val logStartOffset = 0L
val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true)
def epochEndOffset(epoch: Int, endOffset: Long): FetchResponseData.EpochEndOffset = {
new FetchResponseData.EpochEndOffset()
.setEpoch(epoch)
.setEndOffset(endOffset)
}
def read(lastFetchedEpoch: Int, fetchOffset: Long): LogReadInfo = {
fetchFollower(
partition,
remoteReplicaId,
fetchOffset,
logStartOffset,
leaderEpoch = Some(leaderEpoch),
lastFetchedEpoch = Some(lastFetchedEpoch)
)
}
def assertDivergence(
divergingEpoch: FetchResponseData.EpochEndOffset,
readInfo: LogReadInfo
): Unit = {
assertEquals(Optional.of(divergingEpoch), readInfo.divergingEpoch)
assertEquals(0, readInfo.fetchedData.records.sizeInBytes)
}
def assertNoDivergence(readInfo: LogReadInfo): Unit = {
assertEquals(Optional.empty(), readInfo.divergingEpoch)
}
assertDivergence(epochEndOffset(epoch = 0, endOffset = 2), read(lastFetchedEpoch = 2, fetchOffset = 5))
assertDivergence(epochEndOffset(epoch = 0, endOffset= 2), read(lastFetchedEpoch = 0, fetchOffset = 4))
assertDivergence(epochEndOffset(epoch = 4, endOffset = 13), read(lastFetchedEpoch = 6, fetchOffset = 6))
assertDivergence(epochEndOffset(epoch = 4, endOffset = 13), read(lastFetchedEpoch = 5, fetchOffset = 9))
assertDivergence(epochEndOffset(epoch = 10, endOffset = 17), read(lastFetchedEpoch = 10, fetchOffset = 18))
assertNoDivergence(read(lastFetchedEpoch = 0, fetchOffset = 2))
assertNoDivergence(read(lastFetchedEpoch = 7, fetchOffset = 14))
assertNoDivergence(read(lastFetchedEpoch = 9, fetchOffset = 17))
assertNoDivergence(read(lastFetchedEpoch = 10, fetchOffset = 17))
// Reads from epochs larger than we know about should cause an out of range error
assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 11, fetchOffset = 5))
// Move log start offset to the middle of epoch 3
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(newLogStartOffset = 5L, LogStartOffsetIncrementReason.ClientRecordDeletion)
assertDivergence(epochEndOffset(epoch = 2, endOffset = 5), read(lastFetchedEpoch = 2, fetchOffset = 8))
assertNoDivergence(read(lastFetchedEpoch = 0, fetchOffset = 5))
assertNoDivergence(read(lastFetchedEpoch = 3, fetchOffset = 5))
assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 0, fetchOffset = 0))
// Fetch offset lower than start offset should throw OffsetOutOfRangeException
log.maybeIncrementLogStartOffset(newLogStartOffset = 10, LogStartOffsetIncrementReason.ClientRecordDeletion)
assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 5, fetchOffset = 6)) // diverging
assertThrows(classOf[OffsetOutOfRangeException], () => read(lastFetchedEpoch = 3, fetchOffset = 6)) // not diverging
}
@Test
def testMakeLeaderUpdatesEpochCache(): Unit = {
val leaderEpoch = 8
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
), leaderEpoch = 0)
log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 5,
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes)
), leaderEpoch = 5)
assertEquals(4, log.logEndOffset)
val partition = setupPartitionWithMocks(leaderEpoch = leaderEpoch, isLeader = true)
assertEquals(Some(4), partition.leaderLogIfLocal.map(_.logEndOffset))
val epochEndOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpoch = Optional.of[Integer](leaderEpoch),
leaderEpoch = leaderEpoch, fetchOnlyFromLeader = true)
assertEquals(4, epochEndOffset.endOffset)
assertEquals(leaderEpoch, epochEndOffset.leaderEpoch)
}
// Verify that partition.removeFutureLocalReplica() and partition.maybeReplaceCurrentWithFutureReplica() can run concurrently
@Test
def testMaybeReplaceCurrentWithFutureReplica(): Unit = {
val latch = new CountDownLatch(1)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, None)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
val thread1 = new Thread {
override def run(): Unit = {
latch.await()
partition.removeFutureLocalReplica()
}
}
val thread2 = new Thread {
override def run(): Unit = {
latch.await()
partition.maybeReplaceCurrentWithFutureReplica()
}
}
thread1.start()
thread2.start()
latch.countDown()
thread1.join()
thread2.join()
assertEquals(None, partition.futureLog)
}
@Test
def testReplicaFetchToFollower(): Unit = {
val controllerEpoch = 3
val followerId = brokerId + 1
val leaderId = brokerId + 2
val replicas = List[Integer](brokerId, followerId, leaderId).asJava
val isr = List[Integer](brokerId, followerId, leaderId).asJava
val leaderEpoch = 8
val partitionEpoch = 1
assertTrue(partition.makeFollower(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(partitionEpoch)
.setReplicas(replicas)
.setIsNew(true),
offsetCheckpoints, None
))
def assertFetchFromReplicaFails[T <: ApiException](
expectedExceptionClass: Class[T],
leaderEpoch: Option[Int]
): Unit = {
assertThrows(expectedExceptionClass, () => {
fetchFollower(
partition,
replicaId = followerId,
fetchOffset = 0L,
leaderEpoch = leaderEpoch
)
})
}
assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], None)
assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], Some(leaderEpoch))
assertFetchFromReplicaFails(classOf[UnknownLeaderEpochException], Some(leaderEpoch + 1))
assertFetchFromReplicaFails(classOf[FencedLeaderEpochException], Some(leaderEpoch - 1))
}
@Test
def testFetchFromUnrecognizedFollower(): Unit = {
val controllerEpoch = 3
val leader = brokerId
val validReplica = brokerId + 1
val addingReplica1 = brokerId + 2
val addingReplica2 = brokerId + 3
val replicas = List(leader, validReplica)
val isr = List[Integer](leader, validReplica).asJava
val leaderEpoch = 8
val partitionEpoch = 1
assertTrue(partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(partitionEpoch)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None
))
assertThrows(classOf[UnknownLeaderEpochException], () => {
fetchFollower(
partition,
replicaId = addingReplica1,
fetchOffset = 0L,
leaderEpoch = Some(leaderEpoch)
)
})
assertEquals(None, partition.getReplica(addingReplica1).map(_.stateSnapshot.logEndOffset))
assertThrows(classOf[NotLeaderOrFollowerException], () => {
fetchFollower(
partition,
replicaId = addingReplica2,
fetchOffset = 0L,
leaderEpoch = None
)
})
assertEquals(None, partition.getReplica(addingReplica2).map(_.stateSnapshot.logEndOffset))
// The replicas are added as part of a reassignment
val newReplicas = List(leader, validReplica, addingReplica1, addingReplica2)
val newPartitionEpoch = partitionEpoch + 1
val addingReplicas = List(addingReplica1, addingReplica2)
assertFalse(partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(newPartitionEpoch)
.setReplicas(newReplicas.map(Int.box).asJava)
.setAddingReplicas(addingReplicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None
))
// Now the fetches are allowed
assertEquals(0L, fetchFollower(
partition,
replicaId = addingReplica1,
fetchOffset = 0L,
leaderEpoch = Some(leaderEpoch)
).logEndOffset)
assertEquals(Some(0L), partition.getReplica(addingReplica1).map(_.stateSnapshot.logEndOffset))
assertEquals(0L, fetchFollower(
partition,
replicaId = addingReplica2,
fetchOffset = 0L,
leaderEpoch = None
).logEndOffset)
assertEquals(Some(0L), partition.getReplica(addingReplica2).map(_.stateSnapshot.logEndOffset))
}
// Verify that partition.makeFollower() and partition.appendRecordsToFollowerOrFutureReplica() can run concurrently
@Test
def testMakeFollowerWithWithFollowerAppendRecords(): Unit = {
val appendSemaphore = new Semaphore(0)
val mockTime = new MockTime()
partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager) {
override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.recordVersion, "")
val maxTransactionTimeoutMs = 5 * 60 * 1000
val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, true)
val producerStateManager = new ProducerStateManager(
log.topicPartition,
log.dir,
maxTransactionTimeoutMs,
producerStateManagerConfig,
mockTime
)
val offsets = new LogLoader(
log.dir,
log.topicPartition,
log.config,
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
segments = segments,
logStartOffsetCheckpoint = 0L,
recoveryPointCheckpoint = 0L,
leaderEpochCache,
producerStateManager
).load()
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
logDirFailureChannel)
new SlowLog(log, offsets.logStartOffset, localLog, leaderEpochCache, producerStateManager, appendSemaphore)
}
}
partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, None)
val appendThread = new Thread {
override def run(): Unit = {
val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)),
baseOffset = 0)
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
}
}
appendThread.start()
TestUtils.waitUntilTrue(() => appendSemaphore.hasQueuedThreads, "follower log append is not called.")
val partitionState = new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(2)
.setLeaderEpoch(1)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false)
assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None))
appendSemaphore.release()
appendThread.join()
assertEquals(2L, partition.localLogOrException.logEndOffset)
assertEquals(2L, partition.leaderReplicaIdOpt.get)
}
@Test
// Verify that replacement works when the replicas have the same log end offset but different base offsets in the
// active segment
def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, None)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
// Write records with duplicate keys to current replica and roll at offset 6
val currentLog = partition.log.get
currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k1".getBytes, "v2".getBytes),
new SimpleRecord("k1".getBytes, "v3".getBytes),
new SimpleRecord("k2".getBytes, "v4".getBytes),
new SimpleRecord("k2".getBytes, "v5".getBytes),
new SimpleRecord("k2".getBytes, "v6".getBytes)
), leaderEpoch = 0)
currentLog.roll()
currentLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
new SimpleRecord("k3".getBytes, "v7".getBytes),
new SimpleRecord("k4".getBytes, "v8".getBytes)
), leaderEpoch = 0)
// Write to the future replica as if the log had been compacted, and do not roll the segment
val buffer = ByteBuffer.allocate(1024)
val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, 0)
builder.appendWithOffset(2L, new SimpleRecord("k1".getBytes, "v3".getBytes))
builder.appendWithOffset(5L, new SimpleRecord("k2".getBytes, "v6".getBytes))
builder.appendWithOffset(6L, new SimpleRecord("k3".getBytes, "v7".getBytes))
builder.appendWithOffset(7L, new SimpleRecord("k4".getBytes, "v8".getBytes))
val futureLog = partition.futureLocalLogOrException
futureLog.appendAsFollower(builder.build())
assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
}
@Test
def testFetchOffsetSnapshotEpochValidationForLeader(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
def assertSnapshotError(expectedError: Errors, currentLeaderEpoch: Optional[Integer]): Unit = {
try {
partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true)
assertEquals(Errors.NONE, expectedError)
} catch {
case error: ApiException => assertEquals(expectedError, Errors.forException(error))
}
}
assertSnapshotError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1))
assertSnapshotError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1))
assertSnapshotError(Errors.NONE, Optional.of(leaderEpoch))
assertSnapshotError(Errors.NONE, Optional.empty())
}
@Test
def testFetchOffsetSnapshotEpochValidationForFollower(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = false)
def assertSnapshotError(expectedError: Errors,
currentLeaderEpoch: Optional[Integer],
fetchOnlyLeader: Boolean): Unit = {
try {
partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader)
assertEquals(Errors.NONE, expectedError)
} catch {
case error: ApiException => assertEquals(expectedError, Errors.forException(error))
}
}
assertSnapshotError(Errors.NONE, Optional.of(leaderEpoch), fetchOnlyLeader = false)
assertSnapshotError(Errors.NONE, Optional.empty(), fetchOnlyLeader = false)
assertSnapshotError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
assertSnapshotError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
assertSnapshotError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
assertSnapshotError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
assertSnapshotError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
assertSnapshotError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
}
@Test
def testOffsetForLeaderEpochValidationForLeader(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
def assertLastOffsetForLeaderError(error: Errors, currentLeaderEpochOpt: Optional[Integer]): Unit = {
val endOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpochOpt, 0,
fetchOnlyFromLeader = true)
assertEquals(error.code, endOffset.errorCode)
}
assertLastOffsetForLeaderError(Errors.NONE, Optional.empty())
assertLastOffsetForLeaderError(Errors.NONE, Optional.of(leaderEpoch))
assertLastOffsetForLeaderError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1))
assertLastOffsetForLeaderError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1))
}
@Test
def testOffsetForLeaderEpochValidationForFollower(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = false)
def assertLastOffsetForLeaderError(error: Errors,
currentLeaderEpochOpt: Optional[Integer],
fetchOnlyLeader: Boolean): Unit = {
val endOffset = partition.lastOffsetForLeaderEpoch(currentLeaderEpochOpt, 0,
fetchOnlyFromLeader = fetchOnlyLeader)
assertEquals(error.code, endOffset.errorCode)
}
assertLastOffsetForLeaderError(Errors.NONE, Optional.empty(), fetchOnlyLeader = false)
assertLastOffsetForLeaderError(Errors.NONE, Optional.of(leaderEpoch), fetchOnlyLeader = false)
assertLastOffsetForLeaderError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
assertLastOffsetForLeaderError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
assertLastOffsetForLeaderError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
assertLastOffsetForLeaderError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
assertLastOffsetForLeaderError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
assertLastOffsetForLeaderError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
}
@Test
def testLeaderEpochValidationOnLeader(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
def sendFetch(leaderEpoch: Option[Int]): LogReadInfo = {
fetchFollower(
partition,
remoteReplicaId,
fetchOffset = 0L,
leaderEpoch = leaderEpoch
)
}
assertEquals(0L, sendFetch(leaderEpoch = None).logEndOffset)
assertEquals(0L, sendFetch(leaderEpoch = Some(leaderEpoch)).logEndOffset)
assertThrows(classOf[FencedLeaderEpochException], () => sendFetch(Some(leaderEpoch - 1)))
assertThrows(classOf[UnknownLeaderEpochException], () => sendFetch(Some(leaderEpoch + 1)))
}
@Test
def testLeaderEpochValidationOnFollower(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = false)
def sendFetch(
leaderEpoch: Option[Int],
clientMetadata: Option[ClientMetadata]
): LogReadInfo = {
fetchConsumer(
partition,
fetchOffset = 0L,
leaderEpoch = leaderEpoch,
clientMetadata = clientMetadata
)
}
// Follower fetching is only allowed when the client provides metadata
assertThrows(classOf[NotLeaderOrFollowerException], () => sendFetch(None, None))
assertThrows(classOf[NotLeaderOrFollowerException], () => sendFetch(Some(leaderEpoch), None))
assertThrows(classOf[FencedLeaderEpochException], () => sendFetch(Some(leaderEpoch - 1), None))
assertThrows(classOf[UnknownLeaderEpochException], () => sendFetch(Some(leaderEpoch + 1), None))
val clientMetadata = new DefaultClientMetadata(
"rack",
"clientId",
InetAddress.getLoopbackAddress,
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value
)
assertEquals(0L, sendFetch(leaderEpoch = None, Some(clientMetadata)).logEndOffset)
assertEquals(0L, sendFetch(leaderEpoch = Some(leaderEpoch), Some(clientMetadata)).logEndOffset)
assertThrows(classOf[FencedLeaderEpochException], () => sendFetch(Some(leaderEpoch - 1), Some(clientMetadata)))
assertThrows(classOf[UnknownLeaderEpochException], () => sendFetch(Some(leaderEpoch + 1), Some(clientMetadata)))
}
@Test
def testFetchOffsetForTimestampEpochValidationForLeader(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
def assertFetchOffsetError(error: Errors,
currentLeaderEpochOpt: Optional[Integer]): Unit = {
try {
partition.fetchOffsetForTimestamp(0L,
isolationLevel = None,
currentLeaderEpoch = currentLeaderEpochOpt,
fetchOnlyFromLeader = true)
if (error != Errors.NONE)
fail(s"Expected readRecords to fail with error $error")
} catch {
case e: Exception =>
assertEquals(error, Errors.forException(e))
}
}
assertFetchOffsetError(Errors.NONE, Optional.empty())
assertFetchOffsetError(Errors.NONE, Optional.of(leaderEpoch))
assertFetchOffsetError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1))
assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1))
}
@Test
def testFetchOffsetForTimestampEpochValidationForFollower(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = false)
def assertFetchOffsetError(error: Errors,
currentLeaderEpochOpt: Optional[Integer],
fetchOnlyLeader: Boolean): Unit = {
try {
partition.fetchOffsetForTimestamp(0L,
isolationLevel = None,
currentLeaderEpoch = currentLeaderEpochOpt,
fetchOnlyFromLeader = fetchOnlyLeader)
if (error != Errors.NONE)
fail(s"Expected readRecords to fail with error $error")
} catch {
case e: Exception =>
assertEquals(error, Errors.forException(e))
}
}
assertFetchOffsetError(Errors.NONE, Optional.empty(), fetchOnlyLeader = false)
assertFetchOffsetError(Errors.NONE, Optional.of(leaderEpoch), fetchOnlyLeader = false)
assertFetchOffsetError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
assertFetchOffsetError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
assertFetchOffsetError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
assertFetchOffsetError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
}
@Test
def testFetchLatestOffsetIncludesLeaderEpoch(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
val timestampAndOffsetOpt = partition.fetchOffsetForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
isolationLevel = None,
currentLeaderEpoch = Optional.empty(),
fetchOnlyFromLeader = true)
assertTrue(timestampAndOffsetOpt.isDefined)
val timestampAndOffset = timestampAndOffsetOpt.get
assertEquals(leaderEpoch, timestampAndOffset.leaderEpoch.get)
}
/**
* This test checks that after a new leader election, we don't answer any ListOffsetsRequest until
* the HW of the new leader has caught up to its startLogOffset for this epoch. From a client
* perspective this helps guarantee monotonic offsets
*
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change">KIP-207</a>
*/
@Test
def testMonotonicOffsetsAfterLeaderChange(): Unit = {
val controllerEpoch = 3
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val replicas = List(leader, follower1, follower2)
val isr = List[Integer](leader, follower2).asJava
val leaderEpoch = 8
val batch1 = TestUtils.records(records = List(
new SimpleRecord(10, "k1".getBytes, "v1".getBytes),
new SimpleRecord(11,"k2".getBytes, "v2".getBytes)))
val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes),
new SimpleRecord(20,"k4".getBytes, "v2".getBytes),
new SimpleRecord(21,"k5".getBytes, "v3".getBytes)))
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, None), "Expected first makeLeader() to return 'leader changed'")
assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch")
assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR")
val requestLocal = RequestLocal.withThreadConfinedCaching
// after makeLeader(() call, partition should know about all the replicas
// append records with initial leader epoch
partition.appendRecordsToLeader(batch1, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)
partition.appendRecordsToLeader(batch2, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)
assertEquals(partition.localLogOrException.logStartOffset, partition.localLogOrException.highWatermark,
"Expected leader's HW not move")
def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException, Option[TimestampAndOffset]] = {
try {
Right(partition.fetchOffsetForTimestamp(
timestamp = timestamp,
isolationLevel = isolation,
currentLeaderEpoch = Optional.of(partition.getLeaderEpoch),
fetchOnlyFromLeader = true
))
} catch {
case e: ApiException => Left(e)
}
}
// let the follower in ISR move leader's HW to move further but below LEO
fetchFollower(partition, replicaId = follower1, fetchOffset = 0L)
fetchFollower(partition, replicaId = follower1, fetchOffset = 2L)
fetchFollower(partition, replicaId = follower2, fetchOffset = 0L)
fetchFollower(partition, replicaId = follower2, fetchOffset = 2L)
// Simulate successful ISR update
alterPartitionManager.completeIsrUpdate(2)
// At this point, the leader has gotten 5 writes, but followers have only fetched two
assertEquals(2, partition.localLogOrException.highWatermark)
// Get the LEO
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, None) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
case Right(None) => fail("Should have seen some offsets")
case Left(e) => fail("Should not have seen an error")
}
// Get the HW
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(2, offsetAndTimestamp.offset)
case Right(None) => fail("Should have seen some offsets")
case Left(e) => fail("Should not have seen an error")
}
// Get a offset beyond the HW by timestamp, get a None
assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED)))
// Make into a follower
val followerState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(follower2)
.setLeaderEpoch(leaderEpoch + 1)
.setIsr(isr)
.setPartitionEpoch(4)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
assertTrue(partition.makeFollower(followerState, offsetCheckpoints, None))
// Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition
val newLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch + 2)
.setIsr(isr)
.setPartitionEpoch(5)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None))
// Try to get offsets as a client
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => fail("Should have failed with OffsetNotAvailable")
case Right(None) => fail("Should have seen an error")
case Left(e: OffsetNotAvailableException) => // ok
case Left(e: ApiException) => fail(s"Expected OffsetNotAvailableException, got $e")
}
// If request is not from a client, we skip the check
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, None) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
case Right(None) => fail("Should have seen some offsets")
case Left(e: ApiException) => fail(s"Got ApiException $e")
}
// If we request the earliest timestamp, we skip the check
fetchOffsetsForTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(0, offsetAndTimestamp.offset)
case Right(None) => fail("Should have seen some offsets")
case Left(e: ApiException) => fail(s"Got ApiException $e")
}
// If we request the earliest local timestamp, we skip the check
fetchOffsetsForTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(0, offsetAndTimestamp.offset)
case Right(None) => fail("Should have seen some offsets")
case Left(e: ApiException) => fail(s"Got ApiException $e")
}
// If we request an offset by timestamp earlier than the HW, we are ok
fetchOffsetsForTimestamp(11, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) =>
assertEquals(1, offsetAndTimestamp.offset)
assertEquals(11, offsetAndTimestamp.timestamp)
case Right(None) => fail("Should have seen some offsets")
case Left(e: ApiException) => fail(s"Got ApiException $e")
}
// Request an offset by timestamp beyond the HW, get an error now since we're in a bad state
fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => fail("Should have failed")
case Right(None) => fail("Should have failed")
case Left(e: OffsetNotAvailableException) => // ok
case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException, saw $e")
}
// Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
fetchFollower(partition, replicaId = follower1, fetchOffset = 5L)
fetchFollower(partition, replicaId = follower2, fetchOffset = 5L)
// Simulate successful ISR update
alterPartitionManager.completeIsrUpdate(6)
// Error goes away
fetchOffsetsForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
case Right(None) => fail("Should have seen some offsets")
case Left(e: ApiException) => fail(s"Got ApiException $e")
}
// Now we see None instead of an error for out of range timestamp
assertEquals(Right(None), fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)))
}
@Test
def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val log = partition.localLogOrException
val initialLogStartOffset = 5L
partition.truncateFullyAndStartAt(initialLogStartOffset, isFuture = false)
assertEquals(initialLogStartOffset, log.logEndOffset,
s"Log end offset after truncate fully and start at $initialLogStartOffset:")
assertEquals(initialLogStartOffset, log.logStartOffset,
s"Log start offset after truncate fully and start at $initialLogStartOffset:")
// verify that we cannot append records that do not contain log start offset even if the log is empty
assertThrows(classOf[UnexpectedAppendOffsetException], () =>
// append one record with offset = 3
partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 3L), isFuture = false)
)
assertEquals(initialLogStartOffset, log.logEndOffset,
s"Log end offset should not change after failure to append")
// verify that we can append records that contain log start offset, even when first
// offset < log start offset if the log is empty
val newLogStartOffset = 4L
val records = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes),
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = newLogStartOffset)
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
assertEquals(7L, log.logEndOffset, s"Log end offset after append of 3 records with base offset $newLogStartOffset:")
assertEquals(newLogStartOffset, log.logStartOffset, s"Log start offset after append of 3 records with base offset $newLogStartOffset:")
// and we can append more records after that
partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 7L), isFuture = false)
assertEquals(8L, log.logEndOffset, s"Log end offset after append of 1 record at offset 7:")
assertEquals(newLogStartOffset, log.logStartOffset, s"Log start offset not expected to change:")
// but we cannot append to offset < log start if the log is not empty
val records2 = createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)),
baseOffset = 3L)
assertThrows(classOf[UnexpectedAppendOffsetException], () => partition.appendRecordsToFollowerOrFutureReplica(records2, isFuture = false))
assertEquals(8L, log.logEndOffset, s"Log end offset should not change after failure to append")
// we still can append to next offset
partition.appendRecordsToFollowerOrFutureReplica(createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 8L), isFuture = false)
assertEquals(9L, log.logEndOffset, s"Log end offset after append of 1 record at offset 8:")
assertEquals(newLogStartOffset, log.logStartOffset, s"Log start offset not expected to change:")
}
@Test
def testListOffsetIsolationLevels(): Unit = {
val controllerEpoch = 0
val leaderEpoch = 5
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(leaderEpoch, partition.getLeaderEpoch)
val records = createTransactionalRecords(List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes),
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = 0L)
partition.appendRecordsToLeader(records, origin = AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching)
def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = {
val res = partition.fetchOffsetForTimestamp(timestamp,
isolationLevel = isolationLevel,
currentLeaderEpoch = Optional.empty(),
fetchOnlyFromLeader = true)
assertTrue(res.isDefined)
res.get
}
def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
fetchOffset(isolationLevel, ListOffsetsRequest.LATEST_TIMESTAMP)
}
def fetchEarliestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
fetchOffset(isolationLevel, ListOffsetsRequest.EARLIEST_TIMESTAMP)
}
def fetchEarliestLocalOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
fetchOffset(isolationLevel, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
}
assertEquals(3L, fetchLatestOffset(isolationLevel = None).offset)
assertEquals(0L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
assertEquals(0L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
partition.log.get.updateHighWatermark(1L)
assertEquals(3L, fetchLatestOffset(isolationLevel = None).offset)
assertEquals(1L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
assertEquals(0L, fetchLatestOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
assertEquals(0L, fetchEarliestOffset(isolationLevel = None).offset)
assertEquals(0L, fetchEarliestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
assertEquals(0L, fetchEarliestOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
assertEquals(0L, fetchEarliestLocalOffset(isolationLevel = None).offset)
assertEquals(0L, fetchEarliestLocalOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
assertEquals(0L, fetchEarliestLocalOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
}
@Test
def testGetReplica(): Unit = {
assertEquals(None, partition.log)
assertThrows(classOf[NotLeaderOrFollowerException], () =>
partition.localLogOrException
)
}
@Test
def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
assertThrows(classOf[NotLeaderOrFollowerException], () =>
partition.appendRecordsToFollowerOrFutureReplica(
createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false)
)
}
@Test
def testMakeFollowerWithNoLeaderIdChange(): Unit = {
// Start off as follower
var partitionState = new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(1)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false)
partition.makeFollower(partitionState, offsetCheckpoints, None)
// Request with same leader and epoch increases by only 1, do become-follower steps
partitionState = new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(4)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false)
assertTrue(partition.makeFollower(partitionState, offsetCheckpoints, None))
// Request with same leader and same epoch, skip become-follower steps
partitionState = new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(4)
.setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setPartitionEpoch(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
assertFalse(partition.makeFollower(partitionState, offsetCheckpoints, None))
}
@Test
def testFollowerDoesNotJoinISRUntilCaughtUpToOffsetWithinCurrentLeaderEpoch(): Unit = {
val controllerEpoch = 3
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader, follower2).asJava
val leaderEpoch = 8
val batch1 = TestUtils.records(records = List(new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)))
val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes),
new SimpleRecord("k4".getBytes, "v2".getBytes),
new SimpleRecord("k5".getBytes, "v3".getBytes)))
val batch3 = TestUtils.records(records = List(new SimpleRecord("k6".getBytes, "v1".getBytes),
new SimpleRecord("k7".getBytes, "v2".getBytes)))
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true)
assertTrue(partition.makeLeader(leaderState, offsetCheckpoints, None), "Expected first makeLeader() to return 'leader changed'")
assertEquals(leaderEpoch, partition.getLeaderEpoch, "Current leader epoch")
assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR")
val requestLocal = RequestLocal.withThreadConfinedCaching
// after makeLeader(() call, partition should know about all the replicas
// append records with initial leader epoch
val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, origin = AppendOrigin.CLIENT,
requiredAcks = 0, requestLocal).lastOffset
partition.appendRecordsToLeader(batch2, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)
assertEquals(partition.localLogOrException.logStartOffset, partition.log.get.highWatermark, "Expected leader's HW not move")
// let the follower in ISR move leader's HW to move further but below LEO
fetchFollower(partition, replicaId = follower2, fetchOffset = 0)
fetchFollower(partition, replicaId = follower2, fetchOffset = lastOffsetOfFirstBatch)
assertEquals(lastOffsetOfFirstBatch, partition.log.get.highWatermark, "Expected leader's HW")
// current leader becomes follower and then leader again (without any new records appended)
val followerState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(follower2)
.setLeaderEpoch(leaderEpoch + 1)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(false)
partition.makeFollower(followerState, offsetCheckpoints, None)
val newLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch + 2)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(false)
assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints, None),
"Expected makeLeader() to return 'leader changed' after makeFollower()")
val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
// append records with the latest leader epoch
partition.appendRecordsToLeader(batch3, origin = AppendOrigin.CLIENT, requiredAcks = 0, requestLocal)
// fetch from follower not in ISR from log start offset should not add this follower to ISR
fetchFollower(partition, replicaId = follower1, fetchOffset = 0)
fetchFollower(partition, replicaId = follower1, fetchOffset = lastOffsetOfFirstBatch)
assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR")
// fetch from the follower not in ISR from start offset of the current leader epoch should
// add this follower to ISR
fetchFollower(partition, replicaId = follower1, fetchOffset = currentLeaderEpochStartOffset)
// Expansion does not affect the ISR
assertEquals(Set[Integer](leader, follower2), partition.partitionState.isr, "ISR")
assertEquals(Set[Integer](leader, follower1, follower2), partition.partitionState.maximalIsr, "ISR")
assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.toSet,
Set(leader, follower1, follower2), "AlterIsr")
}
def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.LOG_APPEND_TIME,
baseOffset, time.milliseconds, partitionLeaderEpoch)
records.foreach(builder.append)
builder.build()
}
def createIdempotentRecords(records: Iterable[SimpleRecord],
baseOffset: Long,
baseSequence: Int = 0,
producerId: Long = 1L): MemoryRecords = {
val producerEpoch = 0.toShort
val isTransactional = false
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(buf, CompressionType.NONE, baseOffset, producerId,
producerEpoch, baseSequence, isTransactional)
records.foreach(builder.append)
builder.build()
}
def createTransactionalRecords(records: Iterable[SimpleRecord],
baseOffset: Long,
baseSequence: Int = 0,
producerId: Long = 1L): MemoryRecords = {
val producerEpoch = 0.toShort
val isTransactional = true
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(buf, CompressionType.NONE, baseOffset, producerId,
producerEpoch, baseSequence, isTransactional)
records.foreach(builder.append)
builder.build()
}
/**
* Test for AtMinIsr partition state. We set the partition replica set size as 3, but only set one replica as an ISR.
* As the default minIsr configuration is 1, then the partition should be at min ISR (isAtMinIsr = true).
*/
@Test
def testAtMinIsr(): Unit = {
val controllerEpoch = 3
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader).asJava
val leaderEpoch = 8
assertFalse(partition.isAtMinIsr)
// Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1)
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true)
partition.makeLeader(leaderState, offsetCheckpoints, None)
assertTrue(partition.isAtMinIsr)
}
@Test
def testUpdateFollowerFetchState(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 6, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = replicas
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val initializeTimeMs = time.milliseconds()
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
time.sleep(500)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = 0L,
logEndOffset = 3L
)
time.sleep(500)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 6L)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = 6L
)
}
@Test
def testInvalidAlterPartitionRequestsAreNotRetried(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId).asJava
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(Set(brokerId), partition.partitionState.isr)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
// Check that the isr didn't change and alter update is scheduled
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
assertEquals(Set(brokerId, remoteBrokerId), alterPartitionManager.isrUpdates.head.leaderAndIsr.isr.toSet)
// Simulate invalid request failure
alterPartitionManager.failIsrUpdate(Errors.INVALID_REQUEST)
// Still no ISR change and no retry
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertEquals(0, alterPartitionManager.isrUpdates.size)
assertEquals(0, alterPartitionListener.expands.get)
assertEquals(0, alterPartitionListener.shrinks.get)
assertEquals(1, alterPartitionListener.failures.get)
}
@Test
def testIsrExpansion(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId).asJava
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(Set(brokerId), partition.partitionState.isr)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 3L)
assertEquals(Set(brokerId), partition.partitionState.isr)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = 0L,
logEndOffset = 3L
)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
val isrItem = alterPartitionManager.isrUpdates.head
assertEquals(isrItem.leaderAndIsr.isr, List(brokerId, remoteBrokerId))
isrItem.leaderAndIsr.isrWithBrokerEpoch.foreach { brokerState =>
// In ZK mode, the broker epochs in the leaderAndIsr should be -1.
assertEquals(-1, brokerState.brokerEpoch())
}
assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = 10L
)
// Complete the ISR expansion
alterPartitionManager.completeIsrUpdate(2)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(alterPartitionListener.expands.get, 1)
assertEquals(alterPartitionListener.shrinks.get, 0)
assertEquals(alterPartitionListener.failures.get, 0)
}
@Test
def testIsrNotExpandedIfUpdateFails(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId).asJava
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(Set(brokerId), partition.partitionState.isr)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
// Follower state is updated, but the ISR has not expanded
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = 10L
)
// Simulate failure callback
alterPartitionManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Still no ISR change and it doesn't retry
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
assertEquals(alterPartitionListener.expands.get, 0)
assertEquals(alterPartitionListener.shrinks.get, 0)
assertEquals(alterPartitionListener.failures.get, 1)
}
@ParameterizedTest
@ValueSource(strings = Array("fenced", "shutdown", "unfenced"))
def testHighWatermarkIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
val shrinkedIsr = Set(brokerId)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager
)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false),
offsetCheckpoints,
None
),
"Expected become leader transition to succeed"
)
assertEquals(replicas.toSet, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
// Fetch to let the follower catch up to the log end offset
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Follower fetches and catches up to the log end offset.
assertReplicaState(
partition,
remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = log.logEndOffset
)
// Check that the leader updated the HWM to the LEO which is what the follower has
assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark)
if (brokerState == "fenced") {
when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
} else if (brokerState == "shutdown") {
when(metadataCache.isBrokerShuttingDown(remoteBrokerId)).thenReturn(true)
}
// Append records to the log as leader of the current epoch
seedLogData(log, numRecords = 10, leaderEpoch)
// Controller shrinks the ISR after
assertFalse(
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(shrinkedIsr.toList.map(Int.box).asJava)
.setPartitionEpoch(2)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false),
offsetCheckpoints,
None
),
"Expected to stay leader"
)
assertTrue(partition.isLeader)
assertEquals(shrinkedIsr, partition.partitionState.isr)
assertEquals(shrinkedIsr, partition.partitionState.maximalIsr)
assertEquals(Set.empty, partition.getOutOfSyncReplicas(partition.replicaLagTimeMaxMs))
// In the case of unfenced, the HWM doesn't increase, otherwise the the HWM increases because the
// fenced and shutdown replica is not considered during HWM calculation.
if (brokerState == "unfenced") {
assertEquals(10, partition.localLogOrException.highWatermark)
} else {
assertEquals(20, partition.localLogOrException.highWatermark)
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {
val kraft = quorum == "kraft"
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
val isr = Set(brokerId)
val metadataCache: MetadataCache = if (kraft) mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadataCache])
if (kraft) {
addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas)
}
// Mark the remote broker as eligible or ineligible in the metadata cache of the leader.
// When using kraft, we can make the broker ineligible by fencing it.
// In ZK mode, we must mark the broker as alive for it to be eligible.
def markRemoteReplicaEligible(eligible: Boolean): Unit = {
if (kraft) {
when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligible)
} else {
when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible)
}
}
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager
)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr.toList.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
markRemoteReplicaEligible(true)
// Fetch to let the follower catch up to the log end offset and
// to check if an expansion is possible.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Follower fetches and catches up to the log end offset.
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = log.logEndOffset
)
// Expansion is triggered.
assertEquals(isr, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
// Controller rejects the expansion because the broker is fenced or offline.
alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
// The leader reverts back to the previous ISR.
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
// The leader eventually learns about the fenced or offline broker.
markRemoteReplicaEligible(false)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Expansion is not triggered because the follower is fenced.
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
// The broker is eventually unfenced or brought back online.
markRemoteReplicaEligible(true)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Expansion is triggered.
assertEquals(isr, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertTrue(partition.partitionState.isInflight)
assertEquals(1, alterPartitionManager.isrUpdates.size)
// Expansion succeeds.
alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 1)
// ISR is committed.
assertEquals(replicas.toSet, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
}
@Test
def testIsrCanExpandedIfBrokerEpochsMatchWithKraftMetadataCache(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId1 = brokerId + 1
val remoteBrokerId2 = brokerId + 2
val replicas = List(brokerId, remoteBrokerId1, remoteBrokerId2)
val isr = Set(brokerId, remoteBrokerId2)
val metadataCache: MetadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache.asInstanceOf[KRaftMetadataCache], replicas)
// Mark the remote broker 1 as eligible in the metadata cache.
when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId1)).thenReturn(false)
when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerShuttingDown(remoteBrokerId1)).thenReturn(false)
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager,
)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr.toList.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
// Fetch to let the follower catch up to the log end offset, but using a wrong broker epoch. The expansion should fail.
val wrongReplicaEpoch = defaultBrokerEpoch(remoteBrokerId1) + 1
fetchFollower(partition,
replicaId = remoteBrokerId1,
fetchOffset = log.logEndOffset,
replicaEpoch = Some(wrongReplicaEpoch)
)
assertReplicaState(partition, remoteBrokerId1,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = log.logEndOffset,
brokerEpoch = Some(wrongReplicaEpoch)
)
// Expansion is not triggered.
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
assertEquals(0, alterPartitionManager.isrUpdates.size)
// Fetch again, this time with correct default broker epoch.
fetchFollower(partition,
replicaId = remoteBrokerId1,
fetchOffset = log.logEndOffset
)
// Follower should still catch up to the log end offset.
assertReplicaState(partition, remoteBrokerId1,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = log.logEndOffset
)
// Expansion is triggered.
assertEquals(isr, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
val isrUpdate = alterPartitionManager.isrUpdates.head
isrUpdate.leaderAndIsr.isrWithBrokerEpoch.foreach { brokerState =>
if (brokerState.brokerId() == remoteBrokerId2) {
// remoteBrokerId2 has not received any fetch request yet, it does not have broker epoch.
assertEquals(-1, brokerState.brokerEpoch())
} else {
assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch())
}
}
}
@Test
def testIsrNotExpandedIfReplicaIsInControlledShutdown(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = List(brokerId, remoteBrokerId)
val isr = Set(brokerId)
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas)
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager
)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr.toList.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
// Fetch to let the follower catch up to the log end offset and
// to check if an expansion is possible.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Follower fetches and catches up to the log end offset.
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = log.logEndOffset
)
// Expansion is triggered.
assertEquals(isr, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
// Controller rejects the expansion because the broker is in controlled shutdown.
alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
// The leader reverts back to the previous ISR.
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
// The leader eventually learns about the in controlled shutdown broker.
when(metadataCache.isBrokerShuttingDown(remoteBrokerId)).thenReturn(true)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Expansion is not triggered because the follower is fenced.
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
// The broker eventually comes back.
when(metadataCache.isBrokerShuttingDown(remoteBrokerId)).thenReturn(false)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
// Expansion is triggered.
assertEquals(isr, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertTrue(partition.partitionState.isInflight)
assertEquals(1, alterPartitionManager.isrUpdates.size)
// Expansion succeeds.
alterPartitionManager.completeIsrUpdate(newPartitionEpoch= 1)
// ISR is committed.
assertEquals(replicas.toSet, partition.partitionState.isr)
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
}
@Test
def testRetryShrinkIsr(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId, remoteBrokerId)
val topicId = Uuid.randomUuid()
assertTrue(makeLeader(
topicId = Some(topicId),
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
// Sleep enough time to shrink the ISR
time.sleep(partition.replicaLagTimeMaxMs + 1)
// Try to shrink the ISR
partition.maybeShrinkIsr()
assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId))
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
// The shrink fails and we retry
alterPartitionManager.failIsrUpdate(Errors.NETWORK_EXCEPTION)
assertEquals(0, alterPartitionListener.shrinks.get)
assertEquals(1, alterPartitionListener.failures.get)
assertEquals(1, partition.getPartitionEpoch)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertEquals(0L, partition.localLogOrException.highWatermark)
// The shrink succeeds after retrying
alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
assertEquals(1, alterPartitionListener.shrinks.get)
assertEquals(2, partition.getPartitionEpoch)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
assertEquals(Set(brokerId), partition.partitionState.isr)
assertEquals(Set(brokerId), partition.partitionState.maximalIsr)
assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark)
}
@Test
def testMaybeShrinkIsr(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId1 = brokerId + 1
val remoteBrokerId2 = brokerId + 2
val replicas = Seq(brokerId, remoteBrokerId1, remoteBrokerId2)
val isr = Seq(brokerId, remoteBrokerId1, remoteBrokerId2)
val initializeTimeMs = time.milliseconds()
val metadataCache = mock(classOf[KRaftMetadataCache])
addBrokerEpochToMockMetadataCache(metadataCache, replicas.toList)
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager
)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr.toList.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true),
offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(0L, partition.localLogOrException.highWatermark)
fetchFollower(partition, replicaId = remoteBrokerId1, fetchOffset = log.logEndOffset)
assertReplicaState(partition, remoteBrokerId2,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
// On initialization, the replica is considered caught up and should not be removed
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.isr)
// If enough time passes without a fetch update, the ISR should shrink
time.sleep(partition.replicaLagTimeMaxMs + 1)
// Shrink the ISR
partition.maybeShrinkIsr()
assertEquals(0, alterPartitionListener.shrinks.get)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertEquals(alterPartitionManager.isrUpdates.head.leaderAndIsr.isr, List(brokerId, remoteBrokerId1))
val isrUpdate = alterPartitionManager.isrUpdates.head
isrUpdate.leaderAndIsr.isrWithBrokerEpoch.foreach { brokerState =>
assertEquals(defaultBrokerEpoch(brokerState.brokerId()), brokerState.brokerEpoch())
}
assertEquals(Set(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId1, remoteBrokerId2), partition.partitionState.maximalIsr)
assertEquals(0L, partition.localLogOrException.highWatermark)
// After the ISR shrink completes, the ISR state should be updated and the
// high watermark should be advanced
alterPartitionManager.completeIsrUpdate(newPartitionEpoch = 2)
assertEquals(1, alterPartitionListener.shrinks.get)
assertEquals(2, partition.getPartitionEpoch)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
assertEquals(Set(brokerId, remoteBrokerId1), partition.partitionState.isr)
assertEquals(Set(brokerId, remoteBrokerId1), partition.partitionState.maximalIsr)
assertEquals(log.logEndOffset, partition.localLogOrException.highWatermark)
}
@Test
def testAlterIsrLeaderAndIsrRace(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId, remoteBrokerId)
val initializeTimeMs = time.milliseconds()
assertTrue(makeLeader(
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
// Shrink the ISR
time.sleep(partition.replicaLagTimeMaxMs + 1)
partition.maybeShrinkIsr()
assertTrue(partition.partitionState.isInflight)
// Become leader again, reset the ISR state
assertFalse(makeLeader(
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 2,
isNew = false
))
assertEquals(0L, partition.localLogOrException.highWatermark)
assertFalse(partition.partitionState.isInflight, "ISR should be committed and not inflight")
// Try the shrink again, should not submit until AlterIsr response arrives
time.sleep(partition.replicaLagTimeMaxMs + 1)
partition.maybeShrinkIsr()
assertFalse(partition.partitionState.isInflight, "ISR should still be committed and not inflight")
// Complete the AlterIsr update and now we can make modifications again
alterPartitionManager.completeIsrUpdate(10)
partition.maybeShrinkIsr()
assertTrue(partition.partitionState.isInflight, "ISR should be pending a shrink")
}
@Test
def testShouldNotShrinkIsrIfPreviousFetchIsCaughtUp(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId, remoteBrokerId)
val initializeTimeMs = time.milliseconds()
assertTrue(makeLeader(
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
// There is a short delay before the first fetch. The follower is not yet caught up to the log end.
time.sleep(5000)
val firstFetchTimeMs = time.milliseconds()
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 5L, fetchTimeMs = firstFetchTimeMs)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = 0L,
logEndOffset = 5L
)
assertEquals(5L, partition.localLogOrException.highWatermark)
// Some new data is appended, but the follower catches up to the old end offset.
// The total elapsed time from initialization is larger than the max allowed replica lag.
time.sleep(5001)
seedLogData(log, numRecords = 5, leaderEpoch = leaderEpoch)
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L, fetchTimeMs = time.milliseconds())
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = firstFetchTimeMs,
logStartOffset = 0L,
logEndOffset = 10L
)
assertEquals(10L, partition.localLogOrException.highWatermark)
// The ISR should not be shrunk because the follower has caught up with the leader at the
// time of the first fetch.
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
}
@Test
def testShouldNotShrinkIsrIfFollowerCaughtUpToLogEnd(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId, remoteBrokerId)
val initializeTimeMs = time.milliseconds()
assertTrue(makeLeader(
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
// The follower catches up to the log end immediately.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = 10L
)
assertEquals(10L, partition.localLogOrException.highWatermark)
// Sleep longer than the max allowed follower lag
time.sleep(30001)
// The ISR should not be shrunk because the follower is caught up to the leader's log end
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.isr)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
}
@Test
def testIsrNotShrunkIfUpdateFails(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId, remoteBrokerId)
val initializeTimeMs = time.milliseconds()
assertTrue(makeLeader(
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = initializeTimeMs,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
time.sleep(30001)
// Enqueue and AlterIsr that will fail
partition.maybeShrinkIsr()
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertEquals(0L, partition.localLogOrException.highWatermark)
// Simulate failure callback
alterPartitionManager.failIsrUpdate(Errors.INVALID_UPDATE_VERSION)
// Ensure ISR hasn't changed
assertEquals(partition.partitionState.getClass, classOf[PendingShrinkIsr])
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
assertEquals(alterPartitionManager.isrUpdates.size, 0)
assertEquals(0L, partition.localLogOrException.highWatermark)
}
@Test
def testAlterIsrNewLeaderElected(): Unit = {
handleAlterIsrFailure(Errors.NEW_LEADER_ELECTED,
(brokerId: Int, remoteBrokerId: Int, partition: Partition) => {
assertEquals(partition.partitionState.isr, Set(brokerId))
assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId))
assertEquals(alterPartitionManager.isrUpdates.size, 0)
})
}
@Test
def testAlterIsrUnknownTopic(): Unit = {
handleAlterIsrFailure(Errors.UNKNOWN_TOPIC_OR_PARTITION,
(brokerId: Int, remoteBrokerId: Int, partition: Partition) => {
assertEquals(partition.partitionState.isr, Set(brokerId))
assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId))
assertEquals(alterPartitionManager.isrUpdates.size, 0)
})
}
@Test
def testAlterIsrInvalidVersion(): Unit = {
handleAlterIsrFailure(Errors.INVALID_UPDATE_VERSION,
(brokerId: Int, remoteBrokerId: Int, partition: Partition) => {
assertEquals(partition.partitionState.isr, Set(brokerId))
assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId))
assertEquals(alterPartitionManager.isrUpdates.size, 0)
})
}
@Test
def testAlterIsrUnexpectedError(): Unit = {
handleAlterIsrFailure(Errors.UNKNOWN_SERVER_ERROR,
(brokerId: Int, remoteBrokerId: Int, partition: Partition) => {
// We retry these
assertEquals(partition.partitionState.isr, Set(brokerId))
assertEquals(partition.partitionState.maximalIsr, Set(brokerId, remoteBrokerId))
assertEquals(alterPartitionManager.isrUpdates.size, 1)
})
}
def handleAlterIsrFailure(error: Errors, callback: (Int, Int, Partition) => Unit): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val remoteBrokerId = brokerId + 1
val replicas = Seq(brokerId, remoteBrokerId)
val isr = Seq(brokerId)
assertTrue(makeLeader(
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(10L, partition.localLogOrException.highWatermark)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
// This will attempt to expand the ISR
val firstFetchTimeMs = time.milliseconds()
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = 10L, fetchTimeMs = firstFetchTimeMs)
// Follower state is updated, but the ISR has not expanded
assertEquals(Set(brokerId), partition.inSyncReplicaIds)
assertEquals(Set(brokerId, remoteBrokerId), partition.partitionState.maximalIsr)
assertEquals(alterPartitionManager.isrUpdates.size, 1)
assertReplicaState(partition, remoteBrokerId,
lastCaughtUpTimeMs = firstFetchTimeMs,
logStartOffset = 0L,
logEndOffset = 10L
)
// Failure
alterPartitionManager.failIsrUpdate(error)
callback(brokerId, remoteBrokerId, partition)
}
private def createClientResponseWithAlterPartitionResponse(
topicPartition: TopicPartition,
partitionErrorCode: Short,
isr: List[Int] = List.empty,
leaderEpoch: Int = 0,
partitionEpoch: Int = 0
): ClientResponse = {
val alterPartitionResponseData = new AlterPartitionResponseData()
val topicResponse = new AlterPartitionResponseData.TopicData()
.setTopicName(topicPartition.topic)
topicResponse.partitions.add(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition)
.setIsr(isr.map(Integer.valueOf).asJava)
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(partitionEpoch)
.setErrorCode(partitionErrorCode))
alterPartitionResponseData.topics.add(topicResponse)
val alterPartitionResponse = new AlterPartitionResponse(alterPartitionResponseData)
new ClientResponse(new RequestHeader(ApiKeys.ALTER_PARTITION, 0, "client", 1),
null, null, 0, 0, false, null, null, alterPartitionResponse)
}
@Test
def testPartitionShouldRetryAlterPartitionRequest(): Unit = {
val mockChannelManager = mock(classOf[BrokerToControllerChannelManager])
val alterPartitionManager = new DefaultAlterPartitionManager(
controllerChannelManager = mockChannelManager,
scheduler = mock(classOf[KafkaScheduler]),
time = time,
brokerId = brokerId,
brokerEpochSupplier = () => 0,
metadataVersionSupplier = () => MetadataVersion.IBP_3_0_IV0
)
partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = interBrokerProtocolVersion,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager)
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val follower3 = brokerId + 3
val replicas = Seq(brokerId, follower1, follower2, follower3)
val isr = Seq(brokerId, follower1, follower2)
val partitionEpoch = 1
doNothing().when(delayedOperations).checkAndCompleteAll()
// Fail the first alter partition request with a retryable error to trigger a retry from the partition callback
val alterPartitionResponseWithUnknownServerError =
createClientResponseWithAlterPartitionResponse(topicPartition, Errors.UNKNOWN_SERVER_ERROR.code)
// Complete the ISR expansion
val alterPartitionResponseWithoutError =
createClientResponseWithAlterPartitionResponse(topicPartition, Errors.NONE.code, List(brokerId, follower1, follower2, follower3), leaderEpoch, partitionEpoch + 1)
when(mockChannelManager.sendRequest(any(), any()))
.thenAnswer { invocation =>
val controllerRequestCompletionHandler = invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithUnknownServerError)
}
.thenAnswer { invocation =>
val controllerRequestCompletionHandler = invocation.getArguments()(1).asInstanceOf[ControllerRequestCompletionHandler]
controllerRequestCompletionHandler.onComplete(alterPartitionResponseWithoutError)
}
assertTrue(makeLeader(
topicId = None,
controllerEpoch,
leaderEpoch,
isr,
replicas,
partitionEpoch,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
// Expand ISR
fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
assertEquals(Set(brokerId, follower1, follower2, follower3), partition.partitionState.isr)
assertEquals(partitionEpoch + 1, partition.getPartitionEpoch)
// Verify that the AlterPartition request was sent twice
verify(mockChannelManager, times(2)).sendRequest(any(), any())
// After the retry, the partition state should be committed
assertFalse(partition.partitionState.isInflight)
}
@Test
def testSingleInFlightAlterIsr(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val follower3 = brokerId + 3
val replicas = Seq(brokerId, follower1, follower2, follower3)
val isr = Seq(brokerId, follower1, follower2)
doNothing().when(delayedOperations).checkAndCompleteAll()
assertTrue(makeLeader(
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
// Expand ISR
fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
assertEquals(Set(brokerId, follower1, follower2), partition.partitionState.isr)
assertEquals(Set(brokerId, follower1, follower2, follower3), partition.partitionState.maximalIsr)
// One AlterIsr request in-flight
assertEquals(alterPartitionManager.isrUpdates.size, 1)
// Try to modify ISR again, should do nothing
time.sleep(partition.replicaLagTimeMaxMs + 1)
partition.maybeShrinkIsr()
assertEquals(alterPartitionManager.isrUpdates.size, 1)
}
@Test
def testZkIsrManagerAsyncCallback(): Unit = {
// We need a real scheduler here so that the ISR write lock works properly
val scheduler = new KafkaScheduler(1, true, "zk-isr-test")
scheduler.startup()
val kafkaZkClient = mock(classOf[KafkaZkClient])
doAnswer(_ => (true, 2))
.when(kafkaZkClient)
.conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any())
val zkIsrManager = AlterPartitionManager(scheduler, time, kafkaZkClient)
zkIsrManager.start()
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = IBP_2_6_IV0, // shouldn't matter, but set this to a ZK isr version
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
zkIsrManager)
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerEpoch = 0
val leaderEpoch = 5
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val follower3 = brokerId + 3
val replicas = Seq(brokerId, follower1, follower2, follower3)
val isr = Seq(brokerId, follower1, follower2)
doNothing().when(delayedOperations).checkAndCompleteAll()
assertTrue(makeLeader(
partition = partition,
topicId = None,
controllerEpoch = controllerEpoch,
leaderEpoch = leaderEpoch,
isr = isr,
replicas = replicas,
partitionEpoch = 1,
isNew = true
))
assertEquals(0L, partition.localLogOrException.highWatermark)
// Expand ISR
fetchFollower(partition, replicaId = follower3, fetchOffset = 10L)
// Try avoiding a race
TestUtils.waitUntilTrue(() => !partition.partitionState.isInflight, "Expected ISR state to be committed", 100)
partition.partitionState match {
case CommittedPartitionState(isr, _) => assertEquals(Set(brokerId, follower1, follower2, follower3), isr)
case _ => fail("Expected a committed ISR following Zk expansion")
}
scheduler.shutdown()
}
@Test
def testUseCheckpointToInitializeHighWatermark(): Unit = {
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 6, leaderEpoch = 5)
when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition))
.thenReturn(Some(4L))
val controllerEpoch = 3
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(6)
.setIsr(replicas)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(false)
partition.makeLeader(leaderState, offsetCheckpoints, None)
assertEquals(4, partition.localLogOrException.highWatermark)
}
@Test
def testTopicIdAndPartitionMetadataFileForLeader(): Unit = {
val controllerEpoch = 3
val leaderEpoch = 5
val topicId = Uuid.randomUuid()
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(false)
partition.makeLeader(leaderState, offsetCheckpoints, Some(topicId))
checkTopicId(topicId, partition)
// Create new Partition object for same topicPartition
val partition2 = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager)
// partition2 should not yet be associated with the log, but should be able to get ID
assertTrue(partition2.topicId.isDefined)
assertEquals(topicId, partition2.topicId.get)
assertFalse(partition2.log.isDefined)
// Calling makeLeader with a new topic ID should not overwrite the old topic ID. We should get an InconsistentTopicIdException.
// This scenario should not occur, since the topic ID check will fail.
assertThrows(classOf[InconsistentTopicIdException], () => partition2.makeLeader(leaderState, offsetCheckpoints, Some(Uuid.randomUuid())))
// Calling makeLeader with no topic ID should not overwrite the old topic ID. We should get the original log.
partition2.makeLeader(leaderState, offsetCheckpoints, None)
checkTopicId(topicId, partition2)
}
@Test
def testTopicIdAndPartitionMetadataFileForFollower(): Unit = {
val controllerEpoch = 3
val leaderEpoch = 5
val topicId = Uuid.randomUuid()
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(replicas)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(false)
partition.makeFollower(leaderState, offsetCheckpoints, Some(topicId))
checkTopicId(topicId, partition)
// Create new Partition object for same topicPartition
val partition2 = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
logManager,
alterPartitionManager)
// partition2 should not yet be associated with the log, but should be able to get ID
assertTrue(partition2.topicId.isDefined)
assertEquals(topicId, partition2.topicId.get)
assertFalse(partition2.log.isDefined)
// Calling makeFollower with a new topic ID should not overwrite the old topic ID. We should get an InconsistentTopicIdException.
// This scenario should not occur, since the topic ID check will fail.
assertThrows(classOf[InconsistentTopicIdException], () => partition2.makeFollower(leaderState, offsetCheckpoints, Some(Uuid.randomUuid())))
// Calling makeFollower with no topic ID should not overwrite the old topic ID. We should get the original log.
partition2.makeFollower(leaderState, offsetCheckpoints, None)
checkTopicId(topicId, partition2)
}
def checkTopicId(expectedTopicId: Uuid, partition: Partition): Unit = {
assertTrue(partition.topicId.isDefined)
assertEquals(expectedTopicId, partition.topicId.get)
assertTrue(partition.log.isDefined)
val log = partition.log.get
assertEquals(expectedTopicId, log.topicId.get)
assertTrue(log.partitionMetadataFile.get.exists())
assertEquals(expectedTopicId, log.partitionMetadataFile.get.read().topicId)
}
@Test
def testAddAndRemoveMetrics(): Unit = {
val metricsToCheck = List(
"UnderReplicated",
"UnderMinIsr",
"InSyncReplicasCount",
"ReplicasCount",
"LastStableOffsetLag",
"AtMinIsr")
def getMetric(metric: String): Option[Metric] = {
KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.find { case (metricName, _) =>
metricName.getName == metric && metricName.getType == "Partition"
}.map(_._2)
}
assertTrue(metricsToCheck.forall(getMetric(_).isDefined))
Partition.removeMetrics(topicPartition)
assertEquals(Set(), KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.keySet.filter(_.getType == "Partition"))
}
@Test
def testUnderReplicatedPartitionsCorrectSemantics(): Unit = {
val controllerEpoch = 3
val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava
val isr = List[Integer](brokerId, brokerId + 1).asJava
var leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(6)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(false)
partition.makeLeader(leaderState, offsetCheckpoints, None)
assertTrue(partition.isUnderReplicated)
leaderState = leaderState.setIsr(replicas)
partition.makeLeader(leaderState, offsetCheckpoints, None)
assertFalse(partition.isUnderReplicated)
}
@Test
def testUpdateAssignmentAndIsr(): Unit = {
val topicPartition = new TopicPartition("test", 1)
val partition = new Partition(
topicPartition, 1000, MetadataVersion.latest, 0, () => defaultBrokerEpoch(0),
new SystemTime(), mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]),
mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager]))
val replicas = Seq(0, 1, 2, 3)
val followers = Seq(1, 2, 3)
val isr = Set(0, 1, 2, 3)
val adding = Seq(4, 5)
val removing = Seq(1, 2)
// Test with ongoing reassignment
partition.updateAssignmentAndIsr(
replicas,
isLeader = true,
isr,
adding,
removing,
LeaderRecoveryState.RECOVERED
)
assertTrue(partition.assignmentState.isInstanceOf[OngoingReassignmentState], "The assignmentState is not OngoingReassignmentState")
assertEquals(replicas, partition.assignmentState.replicas)
assertEquals(isr, partition.partitionState.isr)
assertEquals(adding, partition.assignmentState.asInstanceOf[OngoingReassignmentState].addingReplicas)
assertEquals(removing, partition.assignmentState.asInstanceOf[OngoingReassignmentState].removingReplicas)
assertEquals(followers, partition.remoteReplicas.map(_.brokerId))
// Test with simple assignment
val replicas2 = Seq(0, 3, 4, 5)
val followers2 = Seq(3, 4, 5)
val isr2 = Set(0, 3, 4, 5)
partition.updateAssignmentAndIsr(
replicas2,
isLeader = true,
isr2,
Seq.empty,
Seq.empty,
LeaderRecoveryState.RECOVERED
)
assertTrue(partition.assignmentState.isInstanceOf[SimpleAssignmentState], "The assignmentState is not SimpleAssignmentState")
assertEquals(replicas2, partition.assignmentState.replicas)
assertEquals(isr2, partition.partitionState.isr)
assertEquals(followers2, partition.remoteReplicas.map(_.brokerId))
// Test with no followers
val replicas3 = Seq(1, 2, 3, 4)
partition.updateAssignmentAndIsr(
replicas3,
isLeader = false,
Set.empty,
Seq.empty,
Seq.empty,
LeaderRecoveryState.RECOVERED
)
assertTrue(partition.assignmentState.isInstanceOf[SimpleAssignmentState], "The assignmentState is not SimpleAssignmentState")
assertEquals(replicas3, partition.assignmentState.replicas)
assertEquals(Set.empty, partition.partitionState.isr)
assertEquals(Seq.empty, partition.remoteReplicas.map(_.brokerId))
}
/**
* Test when log is getting initialized, its config remains untouched after initialization is done.
*/
@Test
def testLogConfigNotDirty(): Unit = {
logManager.shutdown()
val spyConfigRepository = spy(configRepository)
logManager = TestUtils.createLogManager(
logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, configRepository = spyConfigRepository,
cleanerConfig = new CleanerConfig(false), time = time)
val spyLogManager = spy(logManager)
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
spyLogManager,
alterPartitionManager)
partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), ArgumentMatchers.any())
// We should retrieve configs only once
verify(spyConfigRepository, times(1)).topicConfig(topicPartition.topic())
}
/**
* Test when log is getting initialized, its config remains gets reloaded if Topic config gets changed
* before initialization is done.
*/
@Test
def testLogConfigDirtyAsTopicUpdated(): Unit = {
logManager.shutdown()
val spyConfigRepository = spy(configRepository)
logManager = TestUtils.createLogManager(
logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, configRepository = spyConfigRepository,
cleanerConfig = new CleanerConfig(false), time = time)
val spyLogManager = spy(logManager)
doAnswer((_: InvocationOnMock) => {
logManager.initializingLog(topicPartition)
logManager.topicConfigUpdated(topicPartition.topic())
}).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
spyLogManager,
alterPartitionManager)
partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), ArgumentMatchers.any())
// We should retrieve configs twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
}
/**
* Test when log is getting initialized, its config remains gets reloaded if Broker config gets changed
* before initialization is done.
*/
@Test
def testLogConfigDirtyAsBrokerUpdated(): Unit = {
logManager.shutdown()
val spyConfigRepository = spy(configRepository)
logManager = TestUtils.createLogManager(
logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, configRepository = spyConfigRepository,
cleanerConfig = new CleanerConfig(false), time = time)
logManager.startup(Set.empty)
val spyLogManager = spy(logManager)
doAnswer((_: InvocationOnMock) => {
logManager.initializingLog(topicPartition)
logManager.brokerConfigUpdated()
}).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
val partition = new Partition(topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = MetadataVersion.latest,
localBrokerId = brokerId,
() => defaultBrokerEpoch(brokerId),
time,
alterPartitionListener,
delayedOperations,
metadataCache,
spyLogManager,
alterPartitionManager)
partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
// Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), ArgumentMatchers.any())
// We should get configs twice, once before log is created, and second time once
// we find log config is dirty and refresh it.
verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
}
@Test
def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = {
val controllerEpoch = 3
val leaderId = brokerId
val followerId = brokerId + 1
val replicas = List(leaderId, followerId)
val leaderEpoch = 8
val topicId = Uuid.randomUuid()
val initialLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId).map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(1, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Set(leaderId), partition.partitionState.isr)
// Follower's state is initialized with unknown offset because it is not
// in the ISR.
assertReplicaState(partition, followerId,
lastCaughtUpTimeMs = 0L,
logStartOffset = UnifiedLog.UnknownOffset,
logEndOffset = UnifiedLog.UnknownOffset
)
// Follower fetches and updates its replica state.
fetchFollower(partition, replicaId = followerId, fetchOffset = 0L)
assertReplicaState(partition, followerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = 0L
)
// makeLeader is called again with the same leader epoch but with
// a newer partition epoch. This can happen in KRaft when a partition
// is reassigned. The leader epoch is not bumped when we add replicas.
val updatedLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId).map(Int.box).asJava)
.setPartitionEpoch(2)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(2, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Set(leaderId), partition.partitionState.isr)
// Follower's state has not been reset.
assertReplicaState(partition, followerId,
lastCaughtUpTimeMs = time.milliseconds(),
logStartOffset = 0L,
logEndOffset = 0L
)
}
@Test
def testDoNotUpdateEpochStartOffsetIfLeaderEpochIsNotBumped(): Unit = {
val controllerEpoch = 3
val leaderId = brokerId
val followerId = brokerId + 1
val replicas = List(leaderId, followerId)
val leaderEpoch = 8
val topicId = Uuid.randomUuid()
val initialLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId).map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(1, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Set(leaderId), partition.partitionState.isr)
assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
val leaderLog = partition.localLogOrException
assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry))
// Write to the log to increment the log end offset.
leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k1".getBytes, "v1".getBytes)
), leaderEpoch = leaderEpoch)
// makeLeader is called again with the same leader epoch but with
// a newer partition epoch.
val updatedLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId).map(Int.box).asJava)
.setPartitionEpoch(2)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(2, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(Set(leaderId), partition.partitionState.isr)
assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
assertEquals(Optional.of(new EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.asJava.flatMap(_.latestEntry))
}
@Test
def testIgnoreLeaderPartitionStateChangeWithOlderPartitionEpoch(): Unit = {
val controllerEpoch = 3
val leaderId = brokerId
val replicas = List(leaderId)
val leaderEpoch = 8
val topicId = Uuid.randomUuid()
val initialLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId).map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(1, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
// makeLeader is called again with the same leader epoch but with
// a older partition epoch.
val updatedLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(leaderId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId).map(Int.box).asJava)
.setPartitionEpoch(0)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(1, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
}
@Test
def testIgnoreFollowerPartitionStateChangeWithOlderPartitionEpoch(): Unit = {
val controllerEpoch = 3
val leaderId = brokerId
val followerId = brokerId + 1
val replicas = List(leaderId, followerId)
val leaderEpoch = 8
val topicId = Uuid.randomUuid()
val initialFollowerState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(followerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId, followerId).map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertTrue(partition.makeFollower(initialFollowerState, offsetCheckpoints, Some(topicId)))
assertEquals(1, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
// makeLeader is called again with the same leader epoch but with
// a older partition epoch.
val updatedFollowerState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(followerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(List(leaderId, followerId).map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertFalse(partition.makeFollower(updatedFollowerState, offsetCheckpoints, Some(topicId)))
assertEquals(1, partition.getPartitionEpoch)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
}
@Test
def testFollowerShouldNotHaveAnyRemoteReplicaStates(): Unit = {
val controllerEpoch = 3
val localReplica = brokerId
val remoteReplica1 = brokerId + 1
val remoteReplica2 = brokerId + 2
val replicas = List(localReplica, remoteReplica1, remoteReplica2)
val topicId = Uuid.randomUuid()
// The local replica is the leader.
val initialLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(localReplica)
.setLeaderEpoch(1)
.setIsr(replicas.map(Int.box).asJava)
.setPartitionEpoch(1)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true)
assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(1, partition.getPartitionEpoch)
assertEquals(1, partition.getLeaderEpoch)
assertEquals(Some(localReplica), partition.leaderReplicaIdOpt)
assertEquals(replicas.toSet, partition.partitionState.isr)
assertEquals(Seq(remoteReplica1, remoteReplica2), partition.remoteReplicas.map(_.brokerId).toSeq)
assertEquals(replicas, partition.assignmentState.replicas)
// The local replica becomes a follower.
val updatedLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(remoteReplica1)
.setLeaderEpoch(2)
.setIsr(replicas.map(Int.box).asJava)
.setPartitionEpoch(2)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false)
assertTrue(partition.makeFollower(updatedLeaderState, offsetCheckpoints, Some(topicId)))
assertEquals(2, partition.getPartitionEpoch)
assertEquals(2, partition.getLeaderEpoch)
assertEquals(Some(remoteReplica1), partition.leaderReplicaIdOpt)
assertEquals(Set.empty, partition.partitionState.isr)
assertEquals(Seq.empty, partition.remoteReplicas.map(_.brokerId).toSeq)
assertEquals(replicas, partition.assignmentState.replicas)
}
@Test
def testAddAndRemoveListeners(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
topicId = None)
val listener1 = new MockPartitionListener()
val listener2 = new MockPartitionListener()
assertTrue(partition.maybeAddListener(listener1))
listener1.verify()
partition.appendRecordsToLeader(
records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
origin = AppendOrigin.CLIENT,
requiredAcks = 0,
requestLocal = RequestLocal.NoCaching
)
listener1.verify()
listener2.verify()
assertTrue(partition.maybeAddListener(listener2))
listener2.verify()
partition.appendRecordsToLeader(
records = TestUtils.records(List(new SimpleRecord("k2".getBytes, "v2".getBytes))),
origin = AppendOrigin.CLIENT,
requiredAcks = 0,
requestLocal = RequestLocal.NoCaching
)
fetchFollower(
partition = partition,
replicaId = brokerId + 1,
fetchOffset = partition.localLogOrException.logEndOffset
)
listener1.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
partition.removeListener(listener1)
partition.appendRecordsToLeader(
records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
origin = AppendOrigin.CLIENT,
requiredAcks = 0,
requestLocal = RequestLocal.NoCaching
)
fetchFollower(
partition = partition,
replicaId = brokerId + 1,
fetchOffset = partition.localLogOrException.logEndOffset
)
listener1.verify()
listener2.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
}
@Test
def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
topicId = None)
partition.delete()
assertFalse(partition.maybeAddListener(new MockPartitionListener()))
}
@Test
def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
topicId = None)
val listener = new MockPartitionListener()
assertTrue(partition.maybeAddListener(listener))
listener.verify()
partition.appendRecordsToLeader(
records = TestUtils.records(List(new SimpleRecord("k1".getBytes, "v1".getBytes))),
origin = AppendOrigin.CLIENT,
requiredAcks = 0,
requestLocal = RequestLocal.NoCaching
)
listener.verify()
fetchFollower(
partition = partition,
replicaId = brokerId + 1,
fetchOffset = partition.localLogOrException.logEndOffset
)
listener.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
partition.truncateFullyAndStartAt(0L, false)
listener.verify(expectedHighWatermark = 0L)
}
@Test
def testPartitionListenerWhenPartitionFailed(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
topicId = None)
val listener = new MockPartitionListener()
assertTrue(partition.maybeAddListener(listener))
listener.verify()
partition.markOffline()
listener.verify(expectedFailed = true)
}
@Test
def testPartitionListenerWhenPartitionIsDeleted(): Unit = {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, topicId = None)
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
topicId = None)
val listener = new MockPartitionListener()
assertTrue(partition.maybeAddListener(listener))
listener.verify()
partition.delete()
listener.verify(expectedDeleted = true)
}
@Test
def testPartitionListenerWhenCurrentIsReplacedWithFutureLog(): Unit = {
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints, topicId = None)
assertTrue(partition.log.isDefined)
partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(brokerId)
.setLeaderEpoch(0)
.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
.setPartitionEpoch(1)
.setIsNew(true),
offsetCheckpoints,
topicId = None)
val listener = new MockPartitionListener()
assertTrue(partition.maybeAddListener(listener))
listener.verify()
val records = TestUtils.records(List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
partition.appendRecordsToLeader(
records = records,
origin = AppendOrigin.CLIENT,
requiredAcks = 0,
requestLocal = RequestLocal.NoCaching
)
listener.verify()
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
assertTrue(partition.futureLog.isDefined)
val futureLog = partition.futureLog.get
partition.appendRecordsToFollowerOrFutureReplica(
records = records,
isFuture = true
)
listener.verify()
assertTrue(partition.maybeReplaceCurrentWithFutureReplica())
assertEquals(futureLog, partition.log.get)
partition.appendRecordsToLeader(
records = TestUtils.records(List(new SimpleRecord("k3".getBytes, "v3".getBytes))),
origin = AppendOrigin.CLIENT,
requiredAcks = 0,
requestLocal = RequestLocal.NoCaching
)
fetchFollower(
partition = partition,
replicaId = brokerId + 1,
fetchOffset = partition.localLogOrException.logEndOffset
)
listener.verify(expectedHighWatermark = partition.localLogOrException.logEndOffset)
}
@Test
def testHasOngoingTransaction(): Unit = {
val controllerEpoch = 0
val leaderEpoch = 5
val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas
val producerId = 22L
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
assertTrue(partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed")
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertFalse(partition.hasOngoingTransaction(producerId))
val idempotentRecords = createIdempotentRecords(List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes),
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = 0L,
producerId = producerId)
partition.appendRecordsToLeader(idempotentRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
assertFalse(partition.hasOngoingTransaction(producerId))
val transactionRecords = createTransactionalRecords(List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes),
new SimpleRecord("k3".getBytes, "v3".getBytes)),
baseOffset = 0L,
baseSequence = 3,
producerId = producerId)
partition.appendRecordsToLeader(transactionRecords, origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
assertTrue(partition.hasOngoingTransaction(producerId))
}
private def makeLeader(
topicId: Option[Uuid],
controllerEpoch: Int,
leaderEpoch: Int,
isr: Seq[Int],
replicas: Seq[Int],
partitionEpoch: Int,
isNew: Boolean,
partition: Partition = partition
): Boolean = {
partition.createLogIfNotExists(
isNew = isNew,
isFutureReplica = false,
offsetCheckpoints,
topicId
)
val newLeader = partition.makeLeader(
new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr.map(Int.box).asJava)
.setPartitionEpoch(partitionEpoch)
.setReplicas(replicas.map(Int.box).asJava)
.setIsNew(isNew),
offsetCheckpoints,
topicId
)
assertTrue(partition.isLeader)
assertFalse(partition.partitionState.isInflight)
assertEquals(topicId, partition.topicId)
assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(isr.toSet, partition.partitionState.isr)
assertEquals(isr.toSet, partition.partitionState.maximalIsr)
assertEquals(partitionEpoch, partition.getPartitionEpoch)
newLeader
}
private def seedLogData(log: UnifiedLog, numRecords: Int, leaderEpoch: Int): Unit = {
for (i <- 0 until numRecords) {
val records = MemoryRecords.withRecords(0L, CompressionType.NONE, leaderEpoch,
new SimpleRecord(s"k$i".getBytes, s"v$i".getBytes))
log.appendAsLeader(records, leaderEpoch)
}
}
private class SlowLog(
log: UnifiedLog,
logStartOffset: Long,
localLog: LocalLog,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
appendSemaphore: Semaphore
) extends UnifiedLog(
logStartOffset,
localLog,
new BrokerTopicStats,
log.producerIdExpirationCheckIntervalMs,
leaderEpochCache,
producerStateManager,
_topicId = None,
keepPartitionMetadataFile = true) {
override def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
appendSemaphore.acquire()
val appendInfo = super.appendAsFollower(records)
appendInfo
}
}
private def assertReplicaState(
partition: Partition,
replicaId: Int,
lastCaughtUpTimeMs: Long,
logEndOffset: Long,
logStartOffset: Long,
brokerEpoch: Option[Long] = Option.empty
): Unit = {
partition.getReplica(replicaId) match {
case Some(replica) =>
val replicaState = replica.stateSnapshot
assertEquals(lastCaughtUpTimeMs, replicaState.lastCaughtUpTimeMs,
"Unexpected Last Caught Up Time")
assertEquals(logEndOffset, replicaState.logEndOffset,
"Unexpected Log End Offset")
assertEquals(logStartOffset, replicaState.logStartOffset,
"Unexpected Log Start Offset")
if (brokerEpoch.isDefined) {
assertEquals(brokerEpoch.get, replicaState.brokerEpoch.get,
"brokerEpochs mismatch")
}
case None =>
fail(s"Replica $replicaId not found.")
}
}
private def fetchConsumer(
partition: Partition,
fetchOffset: Long,
leaderEpoch: Option[Int],
clientMetadata: Option[ClientMetadata],
maxBytes: Int = Int.MaxValue,
lastFetchedEpoch: Option[Int] = None,
fetchTimeMs: Long = time.milliseconds(),
topicId: Uuid = Uuid.ZERO_UUID,
isolation: FetchIsolation = FetchIsolation.HIGH_WATERMARK
): LogReadInfo = {
val fetchParams = consumerFetchParams(
maxBytes = maxBytes,
clientMetadata = clientMetadata,
isolation = isolation
)
val fetchPartitionData = new FetchRequest.PartitionData(
topicId,
fetchOffset,
FetchRequest.INVALID_LOG_START_OFFSET,
maxBytes,
leaderEpoch.map(Int.box).asJava,
lastFetchedEpoch.map(Int.box).asJava
)
partition.fetchRecords(
fetchParams,
fetchPartitionData,
fetchTimeMs,
maxBytes,
minOneMessage = true,
updateFetchState = false
)
}
private def fetchFollower(
partition: Partition,
replicaId: Int,
fetchOffset: Long,
logStartOffset: Long = 0L,
maxBytes: Int = Int.MaxValue,
leaderEpoch: Option[Int] = None,
lastFetchedEpoch: Option[Int] = None,
fetchTimeMs: Long = time.milliseconds(),
topicId: Uuid = Uuid.ZERO_UUID,
replicaEpoch: Option[Long] = Option.empty
): LogReadInfo = {
val fetchParams = followerFetchParams(
replicaId,
maxBytes = maxBytes,
replicaEpoch = if (!replicaEpoch.isDefined) defaultBrokerEpoch(replicaId) else replicaEpoch.get
)
val fetchPartitionData = new FetchRequest.PartitionData(
topicId,
fetchOffset,
logStartOffset,
maxBytes,
leaderEpoch.map(Int.box).asJava,
lastFetchedEpoch.map(Int.box).asJava
)
partition.fetchRecords(
fetchParams,
fetchPartitionData,
fetchTimeMs,
maxBytes,
minOneMessage = true,
updateFetchState = true
)
}
private def addBrokerEpochToMockMetadataCache(kRaftMetadataCache: KRaftMetadataCache, brokers: List[Int]): Unit = {
brokers.foreach { broker =>
when(kRaftMetadataCache.getAliveBrokerEpoch(broker)).thenReturn(Option(defaultBrokerEpoch(broker)))
}
}
}