| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.server |
| |
| import java.io.File |
| import java.net.InetAddress |
| import java.nio.file.Files |
| import java.util |
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} |
| import java.util.concurrent.{CountDownLatch, TimeUnit} |
| import java.util.stream.IntStream |
| import java.util.{Collections, Optional, OptionalLong, Properties} |
| import kafka.api._ |
| import kafka.cluster.PartitionTest.MockPartitionListener |
| import kafka.cluster.{BrokerEndPoint, Partition} |
| import kafka.log._ |
| import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} |
| import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} |
| import kafka.server.epoch.util.MockBlockingSender |
| import kafka.utils.timer.MockTimer |
| import kafka.utils.{Pool, TestUtils} |
| import org.apache.kafka.clients.FetchSessionHandler |
| import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} |
| import org.apache.kafka.common.message.LeaderAndIsrRequestData |
| import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState |
| import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset |
| import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState |
| import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} |
| import org.apache.kafka.common.metrics.Metrics |
| import org.apache.kafka.common.network.ListenerName |
| import org.apache.kafka.common.protocol.{ApiKeys, Errors} |
| import org.apache.kafka.common.record._ |
| import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, ReplicaSelector, ReplicaView} |
| import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata |
| import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView |
| import org.apache.kafka.common.requests.FetchRequest.PartitionData |
| import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse |
| import org.apache.kafka.common.requests._ |
| import org.apache.kafka.common.security.auth.KafkaPrincipal |
| import org.apache.kafka.common.utils.{LogContext, Time, Utils} |
| import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} |
| import org.apache.kafka.image._ |
| import org.apache.kafka.metadata.LeaderConstants.NO_LEADER |
| import org.apache.kafka.metadata.LeaderRecoveryState |
| import org.apache.kafka.server.common.OffsetAndEpoch |
| import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 |
| import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} |
| import org.apache.kafka.server.util.{MockScheduler, MockTime} |
| import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig} |
| import org.junit.jupiter.api.Assertions._ |
| import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} |
| import org.junit.jupiter.params.ParameterizedTest |
| import org.junit.jupiter.params.provider.ValueSource |
| import com.yammer.metrics.core.Gauge |
| import kafka.log.remote.RemoteLogManager |
| import org.apache.kafka.common.internals.Topic |
| import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction} |
| import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} |
| import org.mockito.invocation.InvocationOnMock |
| import org.mockito.stubbing.Answer |
| import org.mockito.{ArgumentCaptor, ArgumentMatchers} |
| import org.mockito.ArgumentMatchers.{any, anyInt, anyMap, anySet, anyString} |
| import org.mockito.Mockito.{doReturn, mock, mockConstruction, never, reset, times, verify, verifyNoMoreInteractions, when} |
| |
| import scala.collection.{Map, Seq, mutable} |
| import scala.compat.java8.OptionConverters.RichOptionForJava8 |
| import scala.jdk.CollectionConverters._ |
| |
| class ReplicaManagerTest { |
| |
| val topic = "test-topic" |
| val topicId = Uuid.randomUuid() |
| val topicIds = scala.Predef.Map("test-topic" -> topicId) |
| val topicNames = scala.Predef.Map(topicId -> "test-topic") |
| val time = new MockTime |
| val scheduler = new MockScheduler(time) |
| val metrics = new Metrics |
| var alterPartitionManager: AlterPartitionManager = _ |
| var config: KafkaConfig = _ |
| var quotaManager: QuotaManagers = _ |
| var mockRemoteLogManager: RemoteLogManager = _ |
| |
| // Constants defined for readability |
| val zkVersion = 0 |
| val correlationId = 0 |
| var controllerEpoch = 0 |
| val brokerEpoch = 0L |
| |
| @BeforeEach |
| def setUp(): Unit = { |
| val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) |
| config = KafkaConfig.fromProps(props) |
| alterPartitionManager = mock(classOf[AlterPartitionManager]) |
| quotaManager = QuotaFactory.instantiate(config, metrics, time, "") |
| mockRemoteLogManager = mock(classOf[RemoteLogManager]) |
| } |
| |
| @AfterEach |
| def tearDown(): Unit = { |
| TestUtils.clearYammerMetrics() |
| Option(quotaManager).foreach(_.shutdown()) |
| metrics.close() |
| } |
| |
| @Test |
| def testHighWaterMarkDirectoryMapping(): Unit = { |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| val rm = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager) |
| try { |
| val partition = rm.createPartition(new TopicPartition(topic, 1)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) |
| rm.checkpointHighWatermarks() |
| } finally { |
| // shutdown the replica manager upon test completion |
| rm.shutdown(false) |
| } |
| } |
| |
| @Test |
| def testHighwaterMarkRelativeDirectoryMapping(): Unit = { |
| val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) |
| props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) |
| val config = KafkaConfig.fromProps(props) |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| val rm = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager) |
| try { |
| val partition = rm.createPartition(new TopicPartition(topic, 1)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) |
| rm.checkpointHighWatermarks() |
| } finally { |
| // shutdown the replica manager upon test completion |
| rm.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testIllegalRequiredAcks(): Unit = { |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| val rm = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager, |
| threadNamePrefix = Option(this.getClass.getName)) |
| try { |
| def callback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { |
| assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS) |
| } |
| rm.appendRecords( |
| timeout = 0, |
| requiredAcks = 3, |
| internalTopicsAllowed = false, |
| origin = AppendOrigin.CLIENT, |
| entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(CompressionType.NONE, |
| new SimpleRecord("first message".getBytes))), |
| responseCallback = callback) |
| } finally { |
| rm.shutdown(checkpointHW = false) |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| private def mockGetAliveBrokerFunctions(cache: MetadataCache, aliveBrokers: Seq[Node]): Unit = { |
| when(cache.hasAliveBroker(anyInt)).thenAnswer(new Answer[Boolean]() { |
| override def answer(invocation: InvocationOnMock): Boolean = { |
| aliveBrokers.map(_.id()).contains(invocation.getArgument(0).asInstanceOf[Int]) |
| } |
| }) |
| when(cache.getAliveBrokerNode(anyInt, any[ListenerName])). |
| thenAnswer(new Answer[Option[Node]]() { |
| override def answer(invocation: InvocationOnMock): Option[Node] = { |
| aliveBrokers.find(node => node.id == invocation.getArgument(0).asInstanceOf[Integer]) |
| } |
| }) |
| when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) |
| } |
| |
| @Test |
| def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = { |
| val dir1 = TestUtils.tempDir() |
| val dir2 = TestUtils.tempDir() |
| val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) |
| props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) |
| val config = KafkaConfig.fromProps(props) |
| val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) |
| val metadataCache: MetadataCache = mock(classOf[MetadataCache]) |
| mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) |
| when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) |
| val rm = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = logManager, |
| quotaManagers = quotaManager, |
| metadataCache = metadataCache, |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager) |
| val partition = rm.createPartition(new TopicPartition(topic, 0)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) |
| |
| rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(Seq[Integer](0).asJava) |
| .setPartitionEpoch(0) |
| .setReplicas(Seq[Integer](0).asJava) |
| .setIsNew(false)).asJava, |
| Collections.singletonMap(topic, Uuid.randomUuid()), |
| Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) |
| appendRecords(rm, new TopicPartition(topic, 0), |
| MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()), new SimpleRecord("second message".getBytes()))) |
| logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), dir2.getAbsolutePath) |
| |
| partition.createLogIfNotExists(isNew = true, isFutureReplica = true, |
| new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) |
| // remove cache to disable OffsetsForLeaderEpoch API |
| partition.futureLog.get.leaderEpochCache = None |
| |
| // this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error |
| rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None) |
| rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, s.fetchOffset))) |
| // make sure alter log dir thread has processed the data |
| rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.doWork()) |
| assertEquals(Set.empty, rm.replicaAlterLogDirsManager.failedPartitions.partitions()) |
| // the future log becomes the current log, so the partition state should get removed |
| rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => assertEquals(None, t.fetchState(new TopicPartition(topic, 0)))) |
| } |
| |
| @Test |
| def testClearPurgatoryOnBecomingFollower(): Unit = { |
| val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) |
| props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) |
| val config = KafkaConfig.fromProps(props) |
| val logProps = new Properties() |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps)) |
| val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1)) |
| val metadataCache: MetadataCache = mock(classOf[MetadataCache]) |
| mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) |
| when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) |
| val rm = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = metadataCache, |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager) |
| |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) |
| |
| val partition = rm.createPartition(new TopicPartition(topic, 0)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) |
| // Make this replica the leader. |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) |
| rm.getPartitionOrException(new TopicPartition(topic, 0)) |
| .localLogOrException |
| |
| val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes())) |
| val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response => |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error) |
| } |
| |
| // Make this replica the follower |
| val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(1) |
| .setLeaderEpoch(1) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) |
| |
| assertTrue(appendResult.hasFired) |
| } finally { |
| rm.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def checkRemoveMetricsCountMatchRegisterCount(): Unit = { |
| val mockLogMgr = mock(classOf[LogManager]) |
| doReturn(Seq.empty, Seq.empty).when(mockLogMgr).liveLogDirs |
| |
| val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) |
| try { |
| val rm = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager, |
| threadNamePrefix = Option(this.getClass.getName)) |
| |
| // shutdown ReplicaManager so that metrics are removed |
| rm.shutdown() |
| |
| // Use the second instance of metrics group that is constructed. The first instance is constructed by |
| // ReplicaManager constructor > BrokerTopicStats > BrokerTopicMetrics. |
| val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(1) |
| verify(mockMetricsGroup, times(9)).newGauge(anyString(), any()) |
| verify(mockMetricsGroup, times(3)).newMeter(anyString(), anyString(), any(classOf[TimeUnit])) |
| verify(mockMetricsGroup, times(12)).removeMetric(anyString()) |
| |
| // assert that we have verified all invocations on |
| verifyNoMoreInteractions(mockMetricsGroup) |
| } finally { |
| if (mockMetricsGroupCtor != null) { |
| mockMetricsGroupCtor.close() |
| } |
| } |
| } |
| |
| @Test |
| def testFencedErrorCausedByBecomeLeader(): Unit = { |
| testFencedErrorCausedByBecomeLeader(0) |
| testFencedErrorCausedByBecomeLeader(1) |
| testFencedErrorCausedByBecomeLeader(10) |
| } |
| |
| private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val topicPartition = new TopicPartition(topic, 0) |
| replicaManager.createPartition(topicPartition) |
| .createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) |
| |
| def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(epoch) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ()) |
| val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) |
| assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size) |
| |
| val previousReplicaFolder = partition.log.get.dir.getParentFile |
| // find the live and different folder |
| val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).head |
| assertEquals(0, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) |
| replicaManager.alterReplicaLogDirs(Map(topicPartition -> newReplicaFolder.getAbsolutePath)) |
| // make sure the future log is created |
| replicaManager.futureLocalLogOrException(topicPartition) |
| assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) |
| (1 to loopEpochChange).foreach(epoch => replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) => ())) |
| // wait for the ReplicaAlterLogDirsThread to complete |
| TestUtils.waitUntilTrue(() => { |
| replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads() |
| replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.isEmpty |
| }, s"ReplicaAlterLogDirsThread should be gone") |
| |
| // the fenced error should be recoverable |
| assertEquals(0, replicaManager.replicaAlterLogDirsManager.failedPartitions.size) |
| // the replica change is completed after retrying |
| assertTrue(partition.futureLog.isEmpty) |
| assertEquals(newReplicaFolder.getAbsolutePath, partition.log.get.dir.getParent) |
| // change the replica folder again |
| val response = replicaManager.alterReplicaLogDirs(Map(topicPartition -> previousReplicaFolder.getAbsolutePath)) |
| assertNotEquals(0, response.size) |
| response.values.foreach(assertEquals(Errors.NONE, _)) |
| // should succeed to invoke ReplicaAlterLogDirsThread again |
| assertEquals(1, replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size) |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @Test |
| def testReceiveOutOfOrderSequenceExceptionWithLogStartOffset(): Unit = { |
| val timer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) |
| |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| |
| val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) |
| |
| // Make this replica the leader. |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| Collections.singletonMap(topic, Uuid.randomUuid()), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) |
| replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) |
| .localLogOrException |
| |
| val producerId = 234L |
| val epoch = 5.toShort |
| |
| // write a few batches as part of a transaction |
| val numRecords = 3 |
| for (sequence <- 0 until numRecords) { |
| val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, epoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes)) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| assertEquals(0, partition.logStartOffset) |
| |
| // Append a record with an out of range sequence. We should get the OutOfOrderSequence error code with the log |
| // start offset set. |
| val outOfRangeSequence = numRecords + 10 |
| val record = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, epoch, outOfRangeSequence, |
| new SimpleRecord(s"message: $outOfRangeSequence".getBytes)) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), record).onFire { response => |
| assertEquals(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, response.error) |
| assertEquals(0, response.logStartOffset) |
| } |
| |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testProducerIdCountMetrics(): Unit = { |
| val timer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) |
| |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| |
| // Create a couple partition for the topic. |
| val partition0 = replicaManager.createPartition(new TopicPartition(topic, 0)) |
| partition0.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) |
| val partition1 = replicaManager.createPartition(new TopicPartition(topic, 1)) |
| partition1.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) |
| |
| // Make this replica the leader for the partitions. |
| Seq(0, 1).foreach { partition => |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| Collections.singletonMap(topic, Uuid.randomUuid()), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava, |
| false).build() |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) |
| replicaManager.getPartitionOrException(new TopicPartition(topic, partition)) |
| .localLogOrException |
| } |
| |
| def appendRecord(pid: Long, sequence: Int, partition: Int): Unit = { |
| val epoch = 42.toShort |
| val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, pid, epoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes)) |
| appendRecords(replicaManager, new TopicPartition(topic, partition), records).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| def replicaManagerMetricValue(): Int = { |
| KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.filter { case (metricName, _) => |
| metricName.getName == "ProducerIdCount" && metricName.getType == replicaManager.getClass.getSimpleName |
| }.head._2.asInstanceOf[Gauge[Int]].value |
| } |
| |
| // Initially all metrics are 0. |
| assertEquals(0, replicaManagerMetricValue()) |
| |
| val pid1 = 123L |
| // Produce a record from 1st pid to 1st partition. |
| appendRecord(pid1, 0, 0) |
| assertEquals(1, replicaManagerMetricValue()) |
| |
| // Produce another record from 1st pid to 1st partition, metrics shouldn't change. |
| appendRecord(pid1, 1, 0) |
| assertEquals(1, replicaManagerMetricValue()) |
| |
| // Produce a record from 2nd pid to 1st partition |
| val pid2 = 456L |
| appendRecord(pid2, 1, 0) |
| assertEquals(2, replicaManagerMetricValue()) |
| |
| // Produce a record from 1st pid to 2nd partition |
| appendRecord(pid1, 0, 1) |
| assertEquals(3, replicaManagerMetricValue()) |
| |
| // Simulate producer id expiration. |
| // We use -1 because the timestamp in this test is set to -1, so when |
| // the expiration check subtracts timestamp, we get max value. |
| partition0.removeExpiredProducers(Long.MaxValue - 1); |
| assertEquals(1, replicaManagerMetricValue()) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testPartitionsWithLateTransactionsCount(): Unit = { |
| val timer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) |
| val topicPartition = new TopicPartition(topic, 0) |
| |
| def assertLateTransactionCount(expectedCount: Option[Int]): Unit = { |
| assertEquals(expectedCount, TestUtils.yammerGaugeValue[Int]("PartitionsWithLateTransactionsCount")) |
| } |
| |
| try { |
| assertLateTransactionCount(Some(0)) |
| |
| val partition = replicaManager.createPartition(topicPartition) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) |
| |
| // Make this replica the leader. |
| val brokerList = Seq[Integer](0, 1, 2).asJava |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) |
| |
| // Start a transaction |
| val producerId = 234L |
| val epoch = 5.toShort |
| val sequence = 9 |
| val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence, |
| new SimpleRecord(time.milliseconds(), s"message $sequence".getBytes)) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| assertLateTransactionCount(Some(0)) |
| |
| // The transaction becomes late if not finished before the max transaction timeout passes |
| time.sleep(replicaManager.logManager.maxTransactionTimeoutMs + ProducerStateManager.LATE_TRANSACTION_BUFFER_MS) |
| assertLateTransactionCount(Some(0)) |
| time.sleep(1) |
| assertLateTransactionCount(Some(1)) |
| |
| // After the late transaction is aborted, we expect the count to return to 0 |
| val abortTxnMarker = new EndTransactionMarker(ControlRecordType.ABORT, 0) |
| val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, abortTxnMarker) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), |
| abortRecordBatch, origin = AppendOrigin.COORDINATOR).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| assertLateTransactionCount(Some(0)) |
| } finally { |
| // After shutdown, the metric should no longer be registered |
| replicaManager.shutdown(checkpointHW = false) |
| assertLateTransactionCount(None) |
| } |
| } |
| |
| @Test |
| def testReadCommittedFetchLimitedAtLSO(): Unit = { |
| val timer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) |
| |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| |
| val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) |
| |
| // Make this replica the leader. |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) |
| replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) |
| .localLogOrException |
| |
| val producerId = 234L |
| val epoch = 5.toShort |
| |
| // write a few batches as part of a transaction |
| val numRecords = 3 |
| for (sequence <- 0 until numRecords) { |
| val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes)) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| // fetch as follower to advance the high watermark |
| fetchPartitionAsFollower( |
| replicaManager, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, numRecords, 0, 100000, Optional.empty()), |
| replicaId = 1 |
| ) |
| |
| // fetch should return empty since LSO should be stuck at 0 |
| var consumerFetchResult = fetchPartitionAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| isolationLevel = IsolationLevel.READ_COMMITTED) |
| var fetchData = consumerFetchResult.assertFired |
| assertEquals(Errors.NONE, fetchData.error) |
| assertTrue(fetchData.records.batches.asScala.isEmpty) |
| assertEquals(OptionalLong.of(0), fetchData.lastStableOffset) |
| assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions) |
| |
| // delayed fetch should timeout and return nothing |
| consumerFetchResult = fetchPartitionAsConsumer( |
| replicaManager, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| isolationLevel = IsolationLevel.READ_COMMITTED, |
| minBytes = 1000, |
| maxWaitMs = 1000 |
| ) |
| assertFalse(consumerFetchResult.hasFired) |
| timer.advanceClock(1001) |
| |
| fetchData = consumerFetchResult.assertFired |
| assertEquals(Errors.NONE, fetchData.error) |
| assertTrue(fetchData.records.batches.asScala.isEmpty) |
| assertEquals(OptionalLong.of(0), fetchData.lastStableOffset) |
| assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions) |
| |
| // now commit the transaction |
| val endTxnMarker = new EndTransactionMarker(ControlRecordType.COMMIT, 0) |
| val commitRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), commitRecordBatch, |
| origin = AppendOrigin.COORDINATOR) |
| .onFire { response => assertEquals(Errors.NONE, response.error) } |
| |
| // the LSO has advanced, but the appended commit marker has not been replicated, so |
| // none of the data from the transaction should be visible yet |
| consumerFetchResult = fetchPartitionAsConsumer( |
| replicaManager, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| isolationLevel = IsolationLevel.READ_COMMITTED |
| ) |
| |
| fetchData = consumerFetchResult.assertFired |
| assertEquals(Errors.NONE, fetchData.error) |
| assertTrue(fetchData.records.batches.asScala.isEmpty) |
| |
| // fetch as follower to advance the high watermark |
| fetchPartitionAsFollower( |
| replicaManager, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, numRecords + 1, 0, 100000, Optional.empty()), |
| replicaId = 1 |
| ) |
| |
| // now all of the records should be fetchable |
| consumerFetchResult = fetchPartitionAsConsumer(replicaManager, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| isolationLevel = IsolationLevel.READ_COMMITTED) |
| |
| fetchData = consumerFetchResult.assertFired |
| assertEquals(Errors.NONE, fetchData.error) |
| assertEquals(OptionalLong.of(numRecords + 1), fetchData.lastStableOffset) |
| assertEquals(Optional.of(Collections.emptyList()), fetchData.abortedTransactions) |
| assertEquals(numRecords + 1, fetchData.records.batches.asScala.size) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testDelayedFetchIncludesAbortedTransactions(): Unit = { |
| val timer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(timer) |
| |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None) |
| |
| // Make this replica the leader. |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) |
| replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) |
| .localLogOrException |
| |
| val producerId = 234L |
| val epoch = 5.toShort |
| |
| // write a few batches as part of a transaction |
| val numRecords = 3 |
| for (sequence <- 0 until numRecords) { |
| val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, epoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes)) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), records).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| // now abort the transaction |
| val endTxnMarker = new EndTransactionMarker(ControlRecordType.ABORT, 0) |
| val abortRecordBatch = MemoryRecords.withEndTransactionMarker(producerId, epoch, endTxnMarker) |
| appendRecords(replicaManager, new TopicPartition(topic, 0), abortRecordBatch, |
| origin = AppendOrigin.COORDINATOR) |
| .onFire { response => assertEquals(Errors.NONE, response.error) } |
| |
| // fetch as follower to advance the high watermark |
| fetchPartitionAsFollower( |
| replicaManager, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, numRecords + 1, 0, 100000, Optional.empty()), |
| replicaId = 1 |
| ) |
| |
| // Set the minBytes in order force this request to enter purgatory. When it returns, we should still |
| // see the newly aborted transaction. |
| val fetchResult = fetchPartitionAsConsumer( |
| replicaManager, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| isolationLevel = IsolationLevel.READ_COMMITTED, |
| minBytes = 10000, |
| maxWaitMs = 1000 |
| ) |
| assertFalse(fetchResult.hasFired) |
| |
| timer.advanceClock(1001) |
| val fetchData = fetchResult.assertFired |
| |
| assertEquals(Errors.NONE, fetchData.error) |
| assertEquals(OptionalLong.of(numRecords + 1), fetchData.lastStableOffset) |
| assertEquals(numRecords + 1, fetchData.records.records.asScala.size) |
| assertTrue(fetchData.abortedTransactions.isPresent) |
| assertEquals(1, fetchData.abortedTransactions.get.size) |
| |
| val abortedTransaction = fetchData.abortedTransactions.get.get(0) |
| assertEquals(0L, abortedTransaction.firstOffset) |
| assertEquals(producerId, abortedTransaction.producerId) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testFetchBeyondHighWatermark(): Unit = { |
| val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) |
| try { |
| val brokerList = Seq[Integer](0, 1, 2).asJava |
| |
| val partition = rm.createPartition(new TopicPartition(topic, 0)) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, |
| new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) |
| |
| // Make this replica the leader. |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(0) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build() |
| rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) |
| rm.getPartitionOrException(new TopicPartition(topic, 0)) |
| .localLogOrException |
| |
| // Append a couple of messages. |
| for (i <- 1 to 2) { |
| val records = TestUtils.singletonRecords(s"message $i".getBytes) |
| appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| // Followers are always allowed to fetch above the high watermark |
| val followerFetchResult = fetchPartitionAsFollower( |
| rm, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty()), |
| replicaId = 1 |
| ) |
| val followerFetchData = followerFetchResult.assertFired |
| assertEquals(Errors.NONE, followerFetchData.error, "Should not give an exception") |
| assertTrue(followerFetchData.records.batches.iterator.hasNext, "Should return some data") |
| |
| // Consumers are not allowed to consume above the high watermark. However, since the |
| // high watermark could be stale at the time of the request, we do not return an out of |
| // range error and instead return an empty record set. |
| val consumerFetchResult = fetchPartitionAsConsumer(rm, new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.empty())) |
| val consumerFetchData = consumerFetchResult.assertFired |
| assertEquals(Errors.NONE, consumerFetchData.error, "Should not give an exception") |
| assertEquals(MemoryRecords.EMPTY, consumerFetchData.records, "Should return empty response") |
| } finally { |
| rm.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testFollowerStateNotUpdatedIfLogReadFails(): Unit = { |
| val maxFetchBytes = 1024 * 1024 |
| val aliveBrokersIds = Seq(0, 1) |
| val leaderEpoch = 5 |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), |
| brokerId = 0, aliveBrokersIds) |
| try { |
| val tp = new TopicPartition(topic, 0) |
| val tidp = new TopicIdPartition(topicId, tp) |
| val replicas = aliveBrokersIds.toList.map(Int.box).asJava |
| |
| // Broker 0 becomes leader of the partition |
| val leaderAndIsrPartitionState = new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(replicas) |
| .setIsNew(true) |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(leaderAndIsrPartitionState).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) |
| assertEquals(Errors.NONE, leaderAndIsrResponse.error) |
| |
| // Follower replica state is initialized, but initial state is not known |
| assertTrue(replicaManager.onlinePartition(tp).isDefined) |
| val partition = replicaManager.onlinePartition(tp).get |
| |
| assertTrue(partition.getReplica(1).isDefined) |
| val followerReplica = partition.getReplica(1).get |
| assertEquals(-1L, followerReplica.stateSnapshot.logStartOffset) |
| assertEquals(-1L, followerReplica.stateSnapshot.logEndOffset) |
| |
| // Leader appends some data |
| for (i <- 1 to 5) { |
| appendRecords(replicaManager, tp, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| // We receive one valid request from the follower and replica state is updated |
| val validFetchPartitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, maxFetchBytes, |
| Optional.of(leaderEpoch)) |
| |
| val validFetchResult = fetchPartitionAsFollower( |
| replicaManager, |
| tidp, |
| validFetchPartitionData, |
| replicaId = 1 |
| ) |
| |
| assertEquals(Errors.NONE, validFetchResult.assertFired.error) |
| assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) |
| assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) |
| |
| // Next we receive an invalid request with a higher fetch offset, but an old epoch. |
| // We expect that the replica state does not get updated. |
| val invalidFetchPartitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 3L, 0L, maxFetchBytes, |
| Optional.of(leaderEpoch - 1)) |
| |
| |
| val invalidFetchResult = fetchPartitionAsFollower( |
| replicaManager, |
| tidp, |
| invalidFetchPartitionData, |
| replicaId = 1 |
| ) |
| |
| assertEquals(Errors.FENCED_LEADER_EPOCH, invalidFetchResult.assertFired.error) |
| assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) |
| assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) |
| |
| // Next we receive an invalid request with a higher fetch offset, but a diverging epoch. |
| // We expect that the replica state does not get updated. |
| val divergingFetchPartitionData = new FetchRequest.PartitionData(tidp.topicId, 3L, 0L, maxFetchBytes, |
| Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) |
| |
| val divergingEpochResult = fetchPartitionAsFollower( |
| replicaManager, |
| tidp, |
| divergingFetchPartitionData, |
| replicaId = 1 |
| ) |
| |
| assertEquals(Errors.NONE, divergingEpochResult.assertFired.error) |
| assertTrue(divergingEpochResult.assertFired.divergingEpoch.isPresent) |
| assertEquals(0L, followerReplica.stateSnapshot.logStartOffset) |
| assertEquals(0L, followerReplica.stateSnapshot.logEndOffset) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testFetchMessagesWithInconsistentTopicId(): Unit = { |
| val maxFetchBytes = 1024 * 1024 |
| val aliveBrokersIds = Seq(0, 1) |
| val leaderEpoch = 5 |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), |
| brokerId = 0, aliveBrokersIds) |
| try { |
| val tp = new TopicPartition(topic, 0) |
| val tidp = new TopicIdPartition(topicId, tp) |
| val replicas = aliveBrokersIds.toList.map(Int.box).asJava |
| |
| // Broker 0 becomes leader of the partition |
| val leaderAndIsrPartitionState = new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(replicas) |
| .setIsNew(true) |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(leaderAndIsrPartitionState).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) |
| assertEquals(Errors.NONE, leaderAndIsrResponse.error) |
| |
| assertEquals(Some(topicId), replicaManager.getPartitionOrException(tp).topicId) |
| |
| // We receive one valid request from the follower and replica state is updated |
| var successfulFetch: Seq[(TopicIdPartition, FetchPartitionData)] = Seq() |
| |
| val validFetchPartitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, maxFetchBytes, |
| Optional.of(leaderEpoch)) |
| |
| // Fetch messages simulating a different ID than the one in the log. |
| val inconsistentTidp = new TopicIdPartition(Uuid.randomUuid(), tidp.topicPartition) |
| def callback(response: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { |
| successfulFetch = response |
| } |
| |
| fetchPartitions( |
| replicaManager, |
| replicaId = 1, |
| fetchInfos = Seq(inconsistentTidp -> validFetchPartitionData), |
| responseCallback = callback |
| ) |
| |
| val fetch1 = successfulFetch.headOption.filter(_._1 == inconsistentTidp).map(_._2) |
| assertTrue(fetch1.isDefined) |
| assertEquals(Errors.INCONSISTENT_TOPIC_ID, fetch1.get.error) |
| |
| // Simulate where the fetch request did not use topic IDs |
| // Fetch messages simulating an ID in the log. |
| // We should not see topic ID errors. |
| val zeroTidp = new TopicIdPartition(Uuid.ZERO_UUID, tidp.topicPartition) |
| fetchPartitions( |
| replicaManager, |
| replicaId = 1, |
| fetchInfos = Seq(zeroTidp -> validFetchPartitionData), |
| responseCallback = callback |
| ) |
| val fetch2 = successfulFetch.headOption.filter(_._1 == zeroTidp).map(_._2) |
| assertTrue(fetch2.isDefined) |
| assertEquals(Errors.NONE, fetch2.get.error) |
| |
| // Next create a topic without a topic ID written in the log. |
| val tp2 = new TopicPartition("noIdTopic", 0) |
| val tidp2 = new TopicIdPartition(Uuid.randomUuid(), tp2) |
| |
| // Broker 0 becomes leader of the partition |
| val leaderAndIsrPartitionState2 = new LeaderAndIsrPartitionState() |
| .setTopicName("noIdTopic") |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(replicas) |
| .setIsNew(true) |
| val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(leaderAndIsrPartitionState2).asJava, |
| Collections.emptyMap(), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) |
| assertEquals(Errors.NONE, leaderAndIsrResponse2.error) |
| |
| assertEquals(None, replicaManager.getPartitionOrException(tp2).topicId) |
| |
| // Fetch messages simulating the request containing a topic ID. We should not have an error. |
| fetchPartitions( |
| replicaManager, |
| replicaId = 1, |
| fetchInfos = Seq(tidp2 -> validFetchPartitionData), |
| responseCallback = callback |
| ) |
| val fetch3 = successfulFetch.headOption.filter(_._1 == tidp2).map(_._2) |
| assertTrue(fetch3.isDefined) |
| assertEquals(Errors.NONE, fetch3.get.error) |
| |
| // Fetch messages simulating the request not containing a topic ID. We should not have an error. |
| val zeroTidp2 = new TopicIdPartition(Uuid.ZERO_UUID, tidp2.topicPartition) |
| fetchPartitions( |
| replicaManager, |
| replicaId = 1, |
| fetchInfos = Seq(zeroTidp2 -> validFetchPartitionData), |
| responseCallback = callback |
| ) |
| val fetch4 = successfulFetch.headOption.filter(_._1 == zeroTidp2).map(_._2) |
| assertTrue(fetch4.isDefined) |
| assertEquals(Errors.NONE, fetch4.get.error) |
| |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| /** |
| * If a follower sends a fetch request for 2 partitions and it's no longer the follower for one of them, the other |
| * partition should not be affected. |
| */ |
| @Test |
| def testFetchMessagesWhenNotFollowerForOnePartition(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2)) |
| |
| try { |
| // Create 2 partitions, assign replica 0 as the leader for both a different follower (1 and 2) for each |
| val tp0 = new TopicPartition(topic, 0) |
| val tp1 = new TopicPartition(topic, 1) |
| val topicId = Uuid.randomUuid() |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| val tidp1 = new TopicIdPartition(topicId, tp1) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| val partition1Replicas = Seq[Integer](0, 2).asJava |
| val topicIds = Map(tp0.topic -> topicId, tp1.topic -> topicId).asJava |
| val leaderEpoch = 0 |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq( |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(leaderEpoch) |
| .setLeaderEpoch(0) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true), |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp1.topic) |
| .setPartitionIndex(tp1.partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(partition1Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition1Replicas) |
| .setIsNew(true) |
| ).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) |
| |
| // Append a couple of messages. |
| for (i <- 1 to 2) { |
| appendRecords(replicaManager, tp0, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| appendRecords(replicaManager, tp1, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { |
| val responseStatusMap = responseStatus.toMap |
| assertEquals(2, responseStatus.size) |
| assertEquals(Set(tidp0, tidp1), responseStatusMap.keySet) |
| |
| val tp0Status = responseStatusMap.get(tidp0) |
| assertTrue(tp0Status.isDefined) |
| // the response contains high watermark on the leader before it is updated based |
| // on this fetch request |
| assertEquals(0, tp0Status.get.highWatermark) |
| assertEquals(OptionalLong.of(0), tp0Status.get.lastStableOffset) |
| assertEquals(Errors.NONE, tp0Status.get.error) |
| assertTrue(tp0Status.get.records.batches.iterator.hasNext) |
| |
| // Replica 1 is not a valid replica for partition 1 |
| val tp1Status = responseStatusMap.get(tidp1) |
| assertEquals(Errors.UNKNOWN_LEADER_EPOCH, tp1Status.get.error) |
| } |
| |
| fetchPartitions( |
| replicaManager, |
| replicaId = 1, |
| fetchInfos = Seq( |
| tidp0 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.of[Integer](leaderEpoch)), |
| tidp1 -> new PartitionData(Uuid.ZERO_UUID, 1, 0, 100000, Optional.of[Integer](leaderEpoch)) |
| ), |
| responseCallback = fetchCallback, |
| maxWaitMs = 1000, |
| minBytes = 0, |
| maxBytes = Int.MaxValue |
| ) |
| |
| val tp0Log = replicaManager.localLog(tp0) |
| assertTrue(tp0Log.isDefined) |
| assertEquals(1, tp0Log.get.highWatermark, "hw should be incremented") |
| |
| val tp1Replica = replicaManager.localLog(tp1) |
| assertTrue(tp1Replica.isDefined) |
| assertEquals(0, tp1Replica.get.highWatermark, "hw should not be incremented") |
| |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = { |
| verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(new Properties, expectTruncation = false) |
| } |
| |
| @Test |
| def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdateIbp26(): Unit = { |
| val extraProps = new Properties |
| extraProps.put(KafkaConfig.InterBrokerProtocolVersionProp, IBP_2_6_IV0.version) |
| verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(extraProps, expectTruncation = true) |
| } |
| |
| /** |
| * If a partition becomes a follower and the leader is unchanged it should check for truncation |
| * if the epoch has increased by more than one (which suggests it has missed an update). For |
| * IBP version 2.7 onwards, we don't require this since we can truncate at any time based |
| * on diverging epochs returned in fetch responses. |
| */ |
| private def verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(extraProps: Properties, |
| expectTruncation: Boolean): Unit = { |
| val topicPartition = 0 |
| val topicId = Uuid.randomUuid() |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val controllerId = 0 |
| val controllerEpoch = 0 |
| var leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId) |
| val countDownLatch = new CountDownLatch(1) |
| val offsetFromLeader = 5 |
| |
| // Prepare the mocked components for the test |
| val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time), |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, |
| expectTruncation = expectTruncation, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId = Some(topicId)) |
| |
| try { |
| // Initialize partition state to follower, with leader = 1, leaderEpoch = 1 |
| val tp = new TopicPartition(topic, topicPartition) |
| val partition = replicaManager.createPartition(tp) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| partition.makeFollower( |
| leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), |
| offsetCheckpoints, |
| None) |
| |
| // Make local partition a follower - because epoch increased by more than 1, truncation should |
| // trigger even though leader does not change |
| leaderEpoch += leaderEpochIncrement |
| val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, |
| controllerId, controllerEpoch, brokerEpoch, |
| Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(followerBrokerId, "host1", 0), |
| new Node(leaderBrokerId, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0, |
| (_, followers) => assertEquals(followerBrokerId, followers.head.partitionId)) |
| assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS)) |
| |
| // Truncation should have happened once |
| if (expectTruncation) { |
| verify(mockLogMgr).truncateTo(Map(tp -> offsetFromLeader), isFuture = false) |
| } |
| |
| verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(tp), any()) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testReplicaSelector(): Unit = { |
| val topicPartition = 0 |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId) |
| val countDownLatch = new CountDownLatch(1) |
| |
| // Prepare the mocked components for the test |
| val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, |
| leaderBrokerId, countDownLatch, expectTruncation = true) |
| |
| val tp = new TopicPartition(topic, topicPartition) |
| val partition = replicaManager.createPartition(tp) |
| |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| partition.makeLeader( |
| leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), |
| offsetCheckpoints, |
| None) |
| |
| val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", |
| InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") |
| |
| // We expect to select the leader, which means we return None |
| val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica( |
| partition, metadata, FetchRequest.ORDINARY_CONSUMER_ID, 1L, System.currentTimeMillis) |
| assertFalse(preferredReadReplica.isDefined) |
| } |
| |
| @Test |
| def testPreferredReplicaAsFollower(): Unit = { |
| val topicPartition = 0 |
| val topicId = Uuid.randomUuid() |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val countDownLatch = new CountDownLatch(1) |
| |
| // Prepare the mocked components for the test |
| val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, |
| leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId)) |
| |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| |
| // Make this replica the follower |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(1) |
| .setLeaderEpoch(1) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) |
| |
| val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", |
| InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") |
| |
| val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| clientMetadata = Some(metadata)) |
| |
| // Fetch from follower succeeds |
| assertTrue(consumerResult.hasFired) |
| |
| // But only leader will compute preferred replica |
| assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testPreferredReplicaAsLeader(): Unit = { |
| val topicPartition = 0 |
| val topicId = Uuid.randomUuid() |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val countDownLatch = new CountDownLatch(1) |
| |
| // Prepare the mocked components for the test |
| val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, |
| leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId)) |
| |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| |
| // Make this replica the leader |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) |
| |
| val metadata = new DefaultClientMetadata("rack-a", "client-id", |
| InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") |
| |
| val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| clientMetadata = Some(metadata)) |
| |
| // Fetch from leader succeeds |
| assertTrue(consumerResult.hasFired) |
| |
| // Returns a preferred replica (should just be the leader, which is None) |
| assertFalse(consumerResult.assertFired.preferredReadReplica.isPresent) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testPreferredReplicaAsLeaderWhenSameRackFollowerIsOutOfIsr(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), |
| propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName)) |
| |
| try { |
| val leaderBrokerId = 0 |
| val followerBrokerId = 1 |
| val leaderNode = new Node(leaderBrokerId, "host1", 0, "rack-a") |
| val followerNode = new Node(followerBrokerId, "host2", 1, "rack-b") |
| val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava |
| val topicId = Uuid.randomUuid() |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| |
| when(replicaManager.metadataCache.getPartitionReplicaEndpoints( |
| tp0, |
| new ListenerName("default") |
| )).thenReturn(Map( |
| leaderBrokerId -> leaderNode, |
| followerBrokerId -> followerNode |
| ).toMap) |
| |
| // Make this replica the leader and remove follower from ISR. |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder( |
| ApiKeys.LEADER_AND_ISR.latestVersion, |
| 0, |
| 0, |
| brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(leaderBrokerId) |
| .setLeaderEpoch(1) |
| .setIsr(Seq[Integer](leaderBrokerId).asJava) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(leaderNode, followerNode).asJava).build() |
| |
| replicaManager.becomeLeaderOrFollower(2, leaderAndIsrRequest, (_, _) => ()) |
| |
| appendRecords(replicaManager, tp0, TestUtils.singletonRecords(s"message".getBytes)).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| // Fetch as follower to initialise the log end offset of the replica |
| fetchPartitionAsFollower( |
| replicaManager, |
| new TopicIdPartition(topicId, new TopicPartition(topic, 0)), |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| replicaId = 1 |
| ) |
| |
| val metadata = new DefaultClientMetadata("rack-b", "client-id", |
| InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") |
| |
| val consumerResult = fetchPartitionAsConsumer( |
| replicaManager, |
| tidp0, |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), |
| clientMetadata = Some(metadata) |
| ) |
| |
| // Fetch from leader succeeds |
| assertTrue(consumerResult.hasFired) |
| |
| // PartitionView passed to ReplicaSelector should not contain the follower as it's not in the ISR |
| val expectedReplicaViews = Set(new DefaultReplicaView(leaderNode, 1, 0)) |
| val partitionView = replicaManager.replicaSelectorOpt.get |
| .asInstanceOf[MockReplicaSelector].getPartitionViewArgument |
| |
| assertTrue(partitionView.isDefined) |
| assertEquals(expectedReplicaViews.asJava, partitionView.get.replicas) |
| } finally { |
| replicaManager.shutdown() |
| } |
| } |
| |
| @Test |
| def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), |
| propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, classOf[MockReplicaSelector].getName)) |
| try { |
| val leaderBrokerId = 0 |
| val followerBrokerId = 1 |
| val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava |
| val topicId = Uuid.randomUuid() |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| |
| // Make this replica the follower |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(1) |
| .setLeaderEpoch(1) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) |
| |
| val metadata = new DefaultClientMetadata("rack-a", "client-id", |
| InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") |
| |
| val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, |
| Optional.empty()), clientMetadata = Some(metadata)) |
| |
| // Fetch from follower succeeds |
| assertTrue(consumerResult.hasFired) |
| |
| // Expect not run the preferred read replica selection |
| assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getSelectionCount) |
| |
| // Only leader will compute preferred replica |
| assertTrue(!consumerResult.assertFired.preferredReadReplica.isPresent) |
| |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @Test |
| def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), |
| propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "org.apache.kafka.common.replica.RackAwareReplicaSelector")) |
| |
| try { |
| val leaderBrokerId = 0 |
| val followerBrokerId = 1 |
| val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava |
| val topicId = Uuid.randomUuid() |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| |
| when(replicaManager.metadataCache.getPartitionReplicaEndpoints( |
| tp0, |
| new ListenerName("default") |
| )).thenReturn(Map( |
| leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), |
| followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b") |
| ).toMap) |
| |
| // Make this replica the leader |
| val leaderEpoch = 1 |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) |
| |
| // The leader must record the follower's fetch offset to make it eligible for follower fetch selection |
| val followerFetchData = new PartitionData(topicId, 0L, 0L, Int.MaxValue, Optional.of(Int.box(leaderEpoch)), Optional.empty[Integer]) |
| fetchPartitionAsFollower( |
| replicaManager, |
| tidp0, |
| followerFetchData, |
| replicaId = followerBrokerId |
| ) |
| |
| val metadata = new DefaultClientMetadata("rack-b", "client-id", |
| InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") |
| |
| // If a preferred read replica is selected, the fetch response returns immediately, even if min bytes and timeout conditions are not met. |
| val consumerResult = fetchPartitionAsConsumer(replicaManager, tidp0, |
| new PartitionData(topicId, 0, 0, 100000, Optional.empty()), |
| minBytes = 1, clientMetadata = Some(metadata), maxWaitMs = 5000) |
| |
| // Fetch from leader succeeds |
| assertTrue(consumerResult.hasFired) |
| |
| // No delayed fetch was inserted |
| assertEquals(0, replicaManager.delayedFetchPurgatory.watched) |
| |
| // Returns a preferred replica |
| assertTrue(consumerResult.assertFired.preferredReadReplica.isPresent) |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @Test |
| def testFollowerFetchWithDefaultSelectorNoForcedHwPropagation(): Unit = { |
| val topicPartition = 0 |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val countDownLatch = new CountDownLatch(1) |
| val timer = new MockTimer(time) |
| |
| // Prepare the mocked components for the test |
| val (replicaManager, _) = prepareReplicaManagerAndLogManager(timer, |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, |
| leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId)) |
| |
| val brokerList = Seq[Integer](0, 1).asJava |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| |
| // Make this replica the follower |
| val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(false)).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) |
| |
| val simpleRecords = Seq(new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)) |
| val appendResult = appendRecords(replicaManager, tp0, |
| MemoryRecords.withRecords(CompressionType.NONE, simpleRecords.toSeq: _*), AppendOrigin.CLIENT) |
| |
| // Increment the hw in the leader by fetching from the last offset |
| val fetchOffset = simpleRecords.size |
| var followerResult = fetchPartitionAsFollower( |
| replicaManager, |
| tidp0, |
| new PartitionData(Uuid.ZERO_UUID, fetchOffset, 0, 100000, Optional.empty()), |
| replicaId = 1, |
| minBytes = 0 |
| ) |
| assertTrue(followerResult.hasFired) |
| assertEquals(0, followerResult.assertFired.highWatermark) |
| |
| assertTrue(appendResult.hasFired, "Expected producer request to be acked") |
| |
| // Fetch from the same offset, no new data is expected and hence the fetch request should |
| // go to the purgatory |
| followerResult = fetchPartitionAsFollower( |
| replicaManager, |
| tidp0, |
| new PartitionData(Uuid.ZERO_UUID, fetchOffset, 0, 100000, Optional.empty()), |
| replicaId = 1, |
| minBytes = 1000, |
| maxWaitMs = 1000 |
| ) |
| assertFalse(followerResult.hasFired, "Request completed immediately unexpectedly") |
| |
| // Complete the request in the purgatory by advancing the clock |
| timer.advanceClock(1001) |
| assertTrue(followerResult.hasFired) |
| |
| assertEquals(fetchOffset, followerResult.assertFired.highWatermark) |
| } |
| |
| @Test |
| def testUnknownReplicaSelector(): Unit = { |
| val topicPartition = 0 |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val countDownLatch = new CountDownLatch(1) |
| |
| val props = new Properties() |
| props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class") |
| assertThrows(classOf[ClassNotFoundException], () => prepareReplicaManagerAndLogManager(new MockTimer(time), |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, |
| leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)) |
| } |
| |
| @Test |
| def testDefaultReplicaSelector(): Unit = { |
| val topicPartition = 0 |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val countDownLatch = new CountDownLatch(1) |
| |
| val (replicaManager, _) = prepareReplicaManagerAndLogManager(new MockTimer(time), |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, |
| leaderBrokerId, countDownLatch, expectTruncation = true) |
| assertFalse(replicaManager.replicaSelectorOpt.isDefined) |
| } |
| |
| @Test |
| def testFetchFollowerNotAllowedForOlderClients(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1)) |
| |
| try { |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(1) |
| .setLeaderEpoch(0) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) |
| |
| // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+) |
| val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") |
| var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.of(0)) |
| var fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, |
| clientMetadata = Some(clientMetadata)) |
| assertEquals(Errors.NONE, fetchResult.assertFired.error) |
| |
| // Fetch from follower, with empty ClientMetadata (which implies an older version) |
| partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.of(0)) |
| fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData) |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testFetchRequestRateMetrics(): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| def assertMetricCount(expected: Int): Unit = { |
| assertEquals(expected, replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count) |
| assertEquals(expected, replicaManager.brokerTopicStats.topicStats(topic).totalFetchRequestRate.count) |
| } |
| |
| val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.empty()) |
| |
| val nonPurgatoryFetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData) |
| assertEquals(Errors.NONE, nonPurgatoryFetchResult.assertFired.error) |
| assertMetricCount(1) |
| |
| val purgatoryFetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, maxWaitMs = 10) |
| assertFalse(purgatoryFetchResult.hasFired) |
| mockTimer.advanceClock(11) |
| assertEquals(Errors.NONE, purgatoryFetchResult.assertFired.error) |
| assertMetricCount(2) |
| } |
| |
| @Test |
| def testBecomeFollowerWhileOldClientFetchInPurgatory(): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| try { |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.empty()) |
| val fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, maxWaitMs = 10) |
| assertFalse(fetchResult.hasFired) |
| |
| // Become a follower and ensure that the delayed fetch returns immediately |
| val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(1) |
| .setLeaderEpoch(2) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testBecomeFollowerWhileNewClientFetchInPurgatory(): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| try { |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") |
| val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.of(1)) |
| val fetchResult = fetchPartitionAsConsumer( |
| replicaManager, |
| tidp0, |
| partitionData, |
| clientMetadata = Some(clientMetadata), |
| maxWaitMs = 10 |
| ) |
| assertFalse(fetchResult.hasFired) |
| |
| // Become a follower and ensure that the delayed fetch returns immediately |
| val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(1) |
| .setLeaderEpoch(2) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) |
| assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.assertFired.error) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testFetchFromLeaderAlwaysAllowed(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") |
| var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.of(1)) |
| var fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, clientMetadata = Some(clientMetadata)) |
| assertEquals(Errors.NONE, fetchResult.assertFired.error) |
| |
| partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.empty()) |
| fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, clientMetadata = Some(clientMetadata)) |
| assertEquals(Errors.NONE, fetchResult.assertFired.error) |
| } |
| |
| @Test |
| def testClearFetchPurgatoryOnStopReplica(): Unit = { |
| // As part of a reassignment, we may send StopReplica to the old leader. |
| // In this case, we should ensure that pending purgatory operations are cancelled |
| // immediately rather than sitting around to timeout. |
| |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val tidp0 = new TopicIdPartition(topicId, tp0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, |
| Optional.of(1)) |
| val fetchResult = fetchPartitionAsConsumer(replicaManager, tidp0, partitionData, maxWaitMs = 10) |
| assertFalse(fetchResult.hasFired) |
| when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) |
| |
| // We have a fetch in purgatory, now receive a stop replica request and |
| // assert that the fetch returns with a NOT_LEADER error |
| replicaManager.stopReplicas(2, 0, 0, |
| mutable.Map(tp0 -> new StopReplicaPartitionState() |
| .setPartitionIndex(tp0.partition) |
| .setDeletePartition(true) |
| .setLeaderEpoch(LeaderAndIsr.EpochDuringDelete))) |
| |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) |
| } |
| |
| @Test |
| def testClearProducePurgatoryOnStopReplica(): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(1) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true)).asJava, |
| topicIds.asJava, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| val produceResult = sendProducerAppend(replicaManager, tp0, 3) |
| assertNull(produceResult.get) |
| |
| when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) |
| |
| replicaManager.stopReplicas(2, 0, 0, |
| mutable.Map(tp0 -> new StopReplicaPartitionState() |
| .setPartitionIndex(tp0.partition) |
| .setDeletePartition(true) |
| .setLeaderEpoch(LeaderAndIsr.EpochDuringDelete))) |
| |
| assertNotNull(produceResult.get) |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error) |
| } |
| |
| @Test |
| def testVerificationForTransactionalPartitions(): Unit = { |
| val tp = new TopicPartition(topic, 0) |
| val transactionalId = "txn1" |
| val producerId = 24L |
| val producerEpoch = 0.toShort |
| val sequence = 0 |
| |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| val metadataCache = mock(classOf[MetadataCache]) |
| val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) |
| |
| val replicaManager = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = metadataCache, |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager, |
| addPartitionsToTxnManager = Some(addPartitionsToTxnManager)) |
| |
| try { |
| val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1, List(0, 1))) |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| // We must set up the metadata cache to handle the append and verification. |
| val metadataResponseTopic = Seq(new MetadataResponseTopic() |
| .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) |
| .setPartitions(Seq( |
| new MetadataResponsePartition() |
| .setPartitionIndex(0) |
| .setLeaderId(0)).asJava)) |
| val node = new Node(0, "host1", 0) |
| |
| when(metadataCache.contains(tp)).thenReturn(true) |
| when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic) |
| when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node)) |
| when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None) |
| |
| // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error. |
| KafkaRequestHandler.setBypassThreadCheck(true) |
| |
| // Append some transactional records. |
| val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes)) |
| val result = appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) |
| |
| val transactionToAdd = new AddPartitionsToTxnTransaction() |
| .setTransactionalId(transactionalId) |
| .setProducerId(producerId) |
| .setProducerEpoch(producerEpoch) |
| .setVerifyOnly(true) |
| .setTopics(new AddPartitionsToTxnTopicCollection( |
| Seq(new AddPartitionsToTxnTopic().setName(tp.topic).setPartitions(Collections.singletonList(tp.partition))).iterator.asJava |
| )) |
| |
| val appendCallback = ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback]) |
| // We should add these partitions to the manager to verify. |
| verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), appendCallback.capture()) |
| |
| // Confirm we did not write to the log and instead returned error. |
| val callback: AddPartitionsToTxnManager.AppendCallback = appendCallback.getValue() |
| callback(Map(tp -> Errors.INVALID_RECORD).toMap) |
| assertEquals(Errors.INVALID_RECORD, result.assertFired.error) |
| |
| // If we supply no transactional ID and idempotent records, we do not verify, so counter stays the same. |
| val idempotentRecords2 = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1, |
| new SimpleRecord(s"message $sequence".getBytes)) |
| appendRecords(replicaManager, tp, idempotentRecords2) |
| verify(addPartitionsToTxnManager, times(1)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]()) |
| |
| // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records. |
| appendRecordsToMultipleTopics(replicaManager, Map(tp -> transactionalRecords, new TopicPartition(topic, 1) -> idempotentRecords2), transactionalId, Some(0)) |
| verify(addPartitionsToTxnManager, times(2)).addTxnData(ArgumentMatchers.eq(node), ArgumentMatchers.eq(transactionToAdd), any[AddPartitionsToTxnManager.AppendCallback]()) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testExceptionWhenUnverifiedTransactionHasMultipleProducerIds(): Unit = { |
| val tp0 = new TopicPartition(topic, 0) |
| val tp1 = new TopicPartition(topic, 1) |
| val transactionalId = "txn1" |
| val producerId = 24L |
| val producerEpoch = 0.toShort |
| val sequence = 0 |
| |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| val metadataCache = mock(classOf[MetadataCache]) |
| val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) |
| |
| val replicaManager = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = metadataCache, |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager, |
| addPartitionsToTxnManager = Some(addPartitionsToTxnManager)) |
| |
| try { |
| replicaManager.becomeLeaderOrFollower(1, |
| makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), |
| (_, _) => ()) |
| |
| replicaManager.becomeLeaderOrFollower(1, |
| makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), LeaderAndIsr(1, List(0, 1))), |
| (_, _) => ()) |
| |
| // Append some transactional records with different producer IDs |
| val transactionalRecords = mutable.Map[TopicPartition, MemoryRecords]() |
| transactionalRecords.put(tp0, MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes))) |
| transactionalRecords.put(tp1, MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId + 1, producerEpoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes))) |
| |
| assertThrows(classOf[InvalidPidMappingException], |
| () => appendRecordsToMultipleTopics(replicaManager, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0))) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testDisabledVerification(): Unit = { |
| val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) |
| props.put("transaction.partition.verification.enable", "false") |
| val config = KafkaConfig.fromProps(props) |
| |
| val tp = new TopicPartition(topic, 0) |
| val transactionalId = "txn1" |
| val producerId = 24L |
| val producerEpoch = 0.toShort |
| val sequence = 0 |
| |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| val metadataCache = mock(classOf[MetadataCache]) |
| val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) |
| |
| val replicaManager = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = metadataCache, |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager, |
| addPartitionsToTxnManager = Some(addPartitionsToTxnManager)) |
| |
| try { |
| val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(0, List(0, 1))) |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| when(metadataCache.contains(tp)).thenReturn(true) |
| |
| val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, |
| new SimpleRecord(s"message $sequence".getBytes)) |
| appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId, transactionStatePartition = Some(0)) |
| |
| // We should not add these partitions to the manager to verify. |
| verify(metadataCache, times(0)).getTopicMetadata(any(), any(), any(), any()) |
| verify(metadataCache, times(0)).getAliveBrokerNode(any(), any()) |
| verify(metadataCache, times(0)).getAliveBrokerNode(any(), any()) |
| verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), any()) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testGetTransactionCoordinator(): Unit = { |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| |
| val metadataCache = mock(classOf[MetadataCache]) |
| |
| val replicaManager = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = metadataCache, |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager) |
| |
| try { |
| val txnCoordinatorPartition0 = 0 |
| val txnCoordinatorPartition1 = 1 |
| |
| // Before we set up the metadata cache, return nothing for the topic. |
| when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(Seq()) |
| assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) |
| |
| // Return an error response. |
| when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)). |
| thenReturn(Seq(new MetadataResponseTopic().setErrorCode(Errors.UNSUPPORTED_VERSION.code))) |
| assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) |
| |
| val metadataResponseTopic = Seq(new MetadataResponseTopic() |
| .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) |
| .setPartitions(Seq( |
| new MetadataResponsePartition() |
| .setPartitionIndex(0) |
| .setLeaderId(0), |
| new MetadataResponsePartition() |
| .setPartitionIndex(1) |
| .setLeaderId(1)).asJava)) |
| val node0 = new Node(0, "host1", 0) |
| |
| when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic) |
| when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node0)) |
| when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None) |
| |
| assertEquals((Errors.NONE, node0), replicaManager.getTransactionCoordinator(txnCoordinatorPartition0)) |
| assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), replicaManager.getTransactionCoordinator(txnCoordinatorPartition1)) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| private def sendProducerAppend( |
| replicaManager: ReplicaManager, |
| topicPartition: TopicPartition, |
| numOfRecords: Int |
| ): AtomicReference[PartitionResponse] = { |
| val produceResult = new AtomicReference[PartitionResponse]() |
| def callback(response: Map[TopicPartition, PartitionResponse]): Unit = { |
| produceResult.set(response(topicPartition)) |
| } |
| |
| val records = MemoryRecords.withRecords( |
| CompressionType.NONE, |
| IntStream |
| .range(0, numOfRecords) |
| .mapToObj(i => new SimpleRecord(i.toString.getBytes)) |
| .toArray(Array.ofDim[SimpleRecord]): _* |
| ) |
| |
| replicaManager.appendRecords( |
| timeout = 10, |
| requiredAcks = -1, |
| internalTopicsAllowed = false, |
| origin = AppendOrigin.CLIENT, |
| entriesPerPartition = Map(topicPartition -> records), |
| responseCallback = callback |
| ) |
| produceResult |
| } |
| |
| /** |
| * This method assumes that the test using created ReplicaManager calls |
| * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing |
| * 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'. |
| */ |
| private def prepareReplicaManagerAndLogManager(timer: MockTimer, |
| topicPartition: Int, |
| leaderEpochInLeaderAndIsr: Int, |
| followerBrokerId: Int, |
| leaderBrokerId: Int, |
| countDownLatch: CountDownLatch, |
| expectTruncation: Boolean, |
| localLogOffset: Option[Long] = None, |
| offsetFromLeader: Long = 5, |
| leaderEpochFromLeader: Int = 3, |
| extraProps: Properties = new Properties(), |
| topicId: Option[Uuid] = None): (ReplicaManager, LogManager) = { |
| val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) |
| props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) |
| props.asScala ++= extraProps.asScala |
| val config = KafkaConfig.fromProps(props) |
| val logConfig = new LogConfig(new Properties) |
| val logDir = new File(new File(config.logDirs.head), s"$topic-$topicPartition") |
| Files.createDirectories(logDir.toPath) |
| val mockScheduler = new MockScheduler(time) |
| val mockBrokerTopicStats = new BrokerTopicStats |
| val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) |
| val tp = new TopicPartition(topic, topicPartition) |
| val maxTransactionTimeoutMs = 30000 |
| val maxProducerIdExpirationMs = 30000 |
| val segments = new LogSegments(tp) |
| val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.recordVersion, "") |
| val producerStateManager = new ProducerStateManager(tp, logDir, |
| maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, true), time) |
| val offsets = new LogLoader( |
| logDir, |
| tp, |
| logConfig, |
| mockScheduler, |
| time, |
| mockLogDirFailureChannel, |
| hadCleanShutdown = true, |
| segments, |
| 0L, |
| 0L, |
| leaderEpochCache, |
| producerStateManager |
| ).load() |
| val localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint, |
| offsets.nextOffsetMetadata, mockScheduler, time, tp, mockLogDirFailureChannel) |
| val mockLog = new UnifiedLog( |
| logStartOffset = offsets.logStartOffset, |
| localLog = localLog, |
| brokerTopicStats = mockBrokerTopicStats, |
| producerIdExpirationCheckIntervalMs = 30000, |
| leaderEpochCache = leaderEpochCache, |
| producerStateManager = producerStateManager, |
| _topicId = topicId, |
| keepPartitionMetadataFile = true) { |
| |
| override def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = { |
| assertEquals(leaderEpoch, leaderEpochFromLeader) |
| localLogOffset.map { logOffset => |
| Some(new OffsetAndEpoch(logOffset, leaderEpochFromLeader)) |
| }.getOrElse(super.endOffsetForEpoch(leaderEpoch)) |
| } |
| |
| override def latestEpoch: Option[Int] = Some(leaderEpochFromLeader) |
| |
| override def logEndOffsetMetadata: LogOffsetMetadata = |
| localLogOffset.map(new LogOffsetMetadata(_)).getOrElse(super.logEndOffsetMetadata) |
| |
| override def logEndOffset: Long = localLogOffset.getOrElse(super.logEndOffset) |
| } |
| |
| // Expect to call LogManager.truncateTo exactly once |
| val topicPartitionObj = new TopicPartition(topic, topicPartition) |
| val mockLogMgr: LogManager = mock(classOf[LogManager]) |
| when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new File(_).getAbsoluteFile)) |
| when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any())).thenReturn(mockLog) |
| when(mockLogMgr.getLog(topicPartitionObj, isFuture = false)).thenReturn(Some(mockLog)) |
| when(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).thenReturn(None) |
| val allLogs = new Pool[TopicPartition, UnifiedLog]() |
| allLogs.put(topicPartitionObj, mockLog) |
| when(mockLogMgr.allLogs).thenReturn(allLogs.values) |
| when(mockLogMgr.isLogDirOnline(anyString)).thenReturn(true) |
| |
| val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId) |
| val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId)) |
| |
| val metadataCache: MetadataCache = mock(classOf[MetadataCache]) |
| mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) |
| when(metadataCache.getPartitionReplicaEndpoints( |
| any[TopicPartition], any[ListenerName])). |
| thenReturn(Map(leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), |
| followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap) |
| when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) |
| val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( |
| purgatoryName = "Produce", timer, reaperEnabled = false) |
| val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( |
| purgatoryName = "Fetch", timer, reaperEnabled = false) |
| val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( |
| purgatoryName = "DeleteRecords", timer, reaperEnabled = false) |
| val mockElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( |
| purgatoryName = "ElectLeader", timer, reaperEnabled = false) |
| |
| // Mock network client to show leader offset of 5 |
| val blockingSend = new MockBlockingSender( |
| Map(topicPartitionObj -> new EpochEndOffset() |
| .setPartition(topicPartitionObj.partition) |
| .setErrorCode(Errors.NONE.code) |
| .setLeaderEpoch(leaderEpochFromLeader) |
| .setEndOffset(offsetFromLeader)).asJava, |
| BrokerEndPoint(1, "host1" ,1), time) |
| val replicaManager = new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = mockScheduler, |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| brokerTopicStats = mockBrokerTopicStats, |
| metadataCache = metadataCache, |
| logDirFailureChannel = mockLogDirFailureChannel, |
| alterPartitionManager = alterPartitionManager, |
| delayedProducePurgatoryParam = Some(mockProducePurgatory), |
| delayedFetchPurgatoryParam = Some(mockFetchPurgatory), |
| delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), |
| delayedElectLeaderPurgatoryParam = Some(mockElectLeaderPurgatory), |
| threadNamePrefix = Option(this.getClass.getName)) { |
| |
| override protected def createReplicaFetcherManager(metrics: Metrics, |
| time: Time, |
| threadNamePrefix: Option[String], |
| replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = { |
| new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion(), () => 1) { |
| |
| override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = { |
| val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " + |
| s"fetcherId=$fetcherId] ") |
| val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) |
| val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config, |
| replicaManager, quotaManager.follower, () => config.interBrokerProtocolVersion, () => 1) |
| new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, replicaManager, |
| quotaManager.follower, logContext.logPrefix, () => config.interBrokerProtocolVersion) { |
| override def doWork(): Unit = { |
| // In case the thread starts before the partition is added by AbstractFetcherManager, |
| // add it here (it's a no-op if already added) |
| val initialOffset = InitialFetchState( |
| topicId = topicId, |
| leader = new BrokerEndPoint(0, "localhost", 9092), |
| initOffset = 0L, currentLeaderEpoch = leaderEpochInLeaderAndIsr) |
| addPartitions(Map(new TopicPartition(topic, topicPartition) -> initialOffset)) |
| super.doWork() |
| |
| // Shut the thread down after one iteration to avoid double-counting truncations |
| initiateShutdown() |
| countDownLatch.countDown() |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| (replicaManager, mockLogMgr) |
| } |
| |
| private def leaderAndIsrPartitionState(topicPartition: TopicPartition, |
| leaderEpoch: Int, |
| leaderBrokerId: Int, |
| aliveBrokerIds: Seq[Integer], |
| isNew: Boolean = false): LeaderAndIsrPartitionState = { |
| new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(topicPartition.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(leaderBrokerId) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(aliveBrokerIds.asJava) |
| .setPartitionEpoch(zkVersion) |
| .setReplicas(aliveBrokerIds.asJava) |
| .setIsNew(isNew) |
| } |
| |
| private class CallbackResult[T] { |
| private var value: Option[T] = None |
| private var fun: Option[T => Unit] = None |
| |
| def assertFired: T = { |
| assertTrue(hasFired, "Callback has not been fired") |
| value.get |
| } |
| |
| def hasFired: Boolean = { |
| value.isDefined |
| } |
| |
| def fire(value: T): Unit = { |
| this.value = Some(value) |
| fun.foreach(f => f(value)) |
| } |
| |
| def onFire(fun: T => Unit): CallbackResult[T] = { |
| this.fun = Some(fun) |
| if (this.hasFired) fire(value.get) |
| this |
| } |
| } |
| |
| private def appendRecords(replicaManager: ReplicaManager, |
| partition: TopicPartition, |
| records: MemoryRecords, |
| origin: AppendOrigin = AppendOrigin.CLIENT, |
| requiredAcks: Short = -1, |
| transactionalId: String = null, |
| transactionStatePartition: Option[Int] = None): CallbackResult[PartitionResponse] = { |
| val result = new CallbackResult[PartitionResponse]() |
| def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = { |
| val response = responses.get(partition) |
| assertTrue(response.isDefined) |
| result.fire(response.get) |
| } |
| |
| replicaManager.appendRecords( |
| timeout = 1000, |
| requiredAcks = requiredAcks, |
| internalTopicsAllowed = false, |
| origin = origin, |
| entriesPerPartition = Map(partition -> records), |
| responseCallback = appendCallback, |
| transactionalId = transactionalId, |
| transactionStatePartition = transactionStatePartition) |
| |
| result |
| } |
| |
| private def appendRecordsToMultipleTopics(replicaManager: ReplicaManager, |
| entriesToAppend: Map[TopicPartition, MemoryRecords], |
| transactionalId: String, |
| transactionStatePartition: Option[Int], |
| origin: AppendOrigin = AppendOrigin.CLIENT, |
| requiredAcks: Short = -1): Unit = { |
| def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = { |
| responses.foreach( response => responses.get(response._1).isDefined) |
| } |
| |
| replicaManager.appendRecords( |
| timeout = 1000, |
| requiredAcks = requiredAcks, |
| internalTopicsAllowed = false, |
| origin = origin, |
| entriesPerPartition = entriesToAppend, |
| responseCallback = appendCallback, |
| transactionalId = transactionalId, |
| transactionStatePartition = transactionStatePartition) |
| } |
| |
| private def fetchPartitionAsConsumer( |
| replicaManager: ReplicaManager, |
| partition: TopicIdPartition, |
| partitionData: PartitionData, |
| maxWaitMs: Long = 0, |
| minBytes: Int = 1, |
| maxBytes: Int = 1024 * 1024, |
| isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED, |
| clientMetadata: Option[ClientMetadata] = None, |
| ): CallbackResult[FetchPartitionData] = { |
| val isolation = isolationLevel match { |
| case IsolationLevel.READ_COMMITTED => FetchIsolation.TXN_COMMITTED |
| case IsolationLevel.READ_UNCOMMITTED => FetchIsolation.HIGH_WATERMARK |
| } |
| |
| fetchPartition( |
| replicaManager, |
| replicaId = FetchRequest.ORDINARY_CONSUMER_ID, |
| partition, |
| partitionData, |
| minBytes, |
| maxBytes, |
| isolation, |
| clientMetadata, |
| maxWaitMs |
| ) |
| } |
| |
| private def fetchPartitionAsFollower( |
| replicaManager: ReplicaManager, |
| partition: TopicIdPartition, |
| partitionData: PartitionData, |
| replicaId: Int, |
| maxWaitMs: Long = 0, |
| minBytes: Int = 1, |
| maxBytes: Int = 1024 * 1024, |
| ): CallbackResult[FetchPartitionData] = { |
| fetchPartition( |
| replicaManager, |
| replicaId = replicaId, |
| partition, |
| partitionData, |
| minBytes = minBytes, |
| maxBytes = maxBytes, |
| isolation = FetchIsolation.LOG_END, |
| clientMetadata = None, |
| maxWaitMs = maxWaitMs |
| ) |
| } |
| |
| private def fetchPartition( |
| replicaManager: ReplicaManager, |
| replicaId: Int, |
| partition: TopicIdPartition, |
| partitionData: PartitionData, |
| minBytes: Int, |
| maxBytes: Int, |
| isolation: FetchIsolation, |
| clientMetadata: Option[ClientMetadata], |
| maxWaitMs: Long |
| ): CallbackResult[FetchPartitionData] = { |
| val result = new CallbackResult[FetchPartitionData]() |
| def fetchCallback(responseStatus: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { |
| assertEquals(1, responseStatus.size) |
| val (topicPartition, fetchData) = responseStatus.head |
| assertEquals(partition, topicPartition) |
| result.fire(fetchData) |
| } |
| |
| fetchPartitions( |
| replicaManager, |
| replicaId = replicaId, |
| fetchInfos = Seq(partition -> partitionData), |
| responseCallback = fetchCallback, |
| maxWaitMs = maxWaitMs, |
| minBytes = minBytes, |
| maxBytes = maxBytes, |
| isolation = isolation, |
| clientMetadata = clientMetadata |
| ) |
| |
| result |
| } |
| |
| private def fetchPartitions( |
| replicaManager: ReplicaManager, |
| replicaId: Int, |
| fetchInfos: Seq[(TopicIdPartition, PartitionData)], |
| responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, |
| requestVersion: Short = ApiKeys.FETCH.latestVersion, |
| maxWaitMs: Long = 0, |
| minBytes: Int = 1, |
| maxBytes: Int = 1024 * 1024, |
| quota: ReplicaQuota = UnboundedQuota, |
| isolation: FetchIsolation = FetchIsolation.LOG_END, |
| clientMetadata: Option[ClientMetadata] = None |
| ): Unit = { |
| val params = new FetchParams( |
| requestVersion, |
| replicaId, |
| 1, |
| maxWaitMs, |
| minBytes, |
| maxBytes, |
| isolation, |
| clientMetadata.asJava |
| ) |
| |
| replicaManager.fetchMessages( |
| params, |
| fetchInfos, |
| quota, |
| responseCallback |
| ) |
| } |
| |
| private def setupReplicaManagerWithMockedPurgatories( |
| timer: MockTimer, |
| brokerId: Int = 0, |
| aliveBrokerIds: Seq[Int] = Seq(0, 1), |
| propsModifier: Properties => Unit = _ => {}, |
| mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None, |
| mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None, |
| isShuttingDown: AtomicBoolean = new AtomicBoolean(false), |
| enableRemoteStorage: Boolean = false |
| ): ReplicaManager = { |
| val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect) |
| props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath) |
| propsModifier.apply(props) |
| val config = KafkaConfig.fromProps(props) |
| val logProps = new Properties() |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps)) |
| val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId)) |
| |
| val metadataCache: MetadataCache = mock(classOf[MetadataCache]) |
| when(metadataCache.topicIdInfo()).thenReturn((topicIds.asJava, topicNames.asJava)) |
| when(metadataCache.topicNamesToIds()).thenReturn(topicIds.asJava) |
| when(metadataCache.topicIdsToNames()).thenReturn(topicNames.asJava) |
| when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) |
| mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) |
| val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( |
| purgatoryName = "Produce", timer, reaperEnabled = false) |
| val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( |
| purgatoryName = "Fetch", timer, reaperEnabled = false) |
| val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( |
| purgatoryName = "DeleteRecords", timer, reaperEnabled = false) |
| val mockDelayedElectLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectLeader]( |
| purgatoryName = "DelayedElectLeader", timer, reaperEnabled = false) |
| |
| new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = scheduler, |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = metadataCache, |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager, |
| isShuttingDown = isShuttingDown, |
| delayedProducePurgatoryParam = Some(mockProducePurgatory), |
| delayedFetchPurgatoryParam = Some(mockFetchPurgatory), |
| delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), |
| delayedElectLeaderPurgatoryParam = Some(mockDelayedElectLeaderPurgatory), |
| threadNamePrefix = Option(this.getClass.getName), |
| remoteLogManager = if (enableRemoteStorage) Some(mockRemoteLogManager) else None) { |
| |
| override protected def createReplicaFetcherManager( |
| metrics: Metrics, |
| time: Time, |
| threadNamePrefix: Option[String], |
| quotaManager: ReplicationQuotaManager |
| ): ReplicaFetcherManager = { |
| mockReplicaFetcherManager.getOrElse { |
| super.createReplicaFetcherManager( |
| metrics, |
| time, |
| threadNamePrefix, |
| quotaManager |
| ) |
| } |
| } |
| |
| override def createReplicaAlterLogDirsManager( |
| quotaManager: ReplicationQuotaManager, |
| brokerTopicStats: BrokerTopicStats |
| ): ReplicaAlterLogDirsManager = { |
| mockReplicaAlterLogDirsManager.getOrElse { |
| super.createReplicaAlterLogDirsManager( |
| quotaManager, |
| brokerTopicStats |
| ) |
| } |
| } |
| } |
| } |
| |
| @Test |
| def testOldLeaderLosesMetricsWhenReassignPartitions(): Unit = { |
| val controllerEpoch = 0 |
| val leaderEpoch = 0 |
| val leaderEpochIncrement = 1 |
| val correlationId = 0 |
| val controllerId = 0 |
| val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats]) |
| val (rm0, rm1) = prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), mockTopicStats1) |
| |
| try { |
| // make broker 0 the leader of partition 0 and |
| // make broker 1 the leader of partition 1 |
| val tp0 = new TopicPartition(topic, 0) |
| val tp1 = new TopicPartition(topic, 1) |
| val partition0Replicas = Seq[Integer](0, 1).asJava |
| val partition1Replicas = Seq[Integer](1, 0).asJava |
| val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava |
| |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, |
| controllerId, 0, brokerEpoch, |
| Seq( |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true), |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp1.topic) |
| .setPartitionIndex(tp1.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(1) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(partition1Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition1Replicas) |
| .setIsNew(true) |
| ).asJava, |
| topicIds, |
| Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() |
| |
| rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ()) |
| rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ()) |
| |
| // make broker 0 the leader of partition 1 so broker 1 loses its leadership position |
| val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, |
| controllerEpoch, brokerEpoch, |
| Seq( |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch + leaderEpochIncrement) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true), |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp1.topic) |
| .setPartitionIndex(tp1.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch + leaderEpochIncrement) |
| .setIsr(partition1Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition1Replicas) |
| .setIsNew(true) |
| ).asJava, |
| topicIds, |
| Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() |
| |
| rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ()) |
| rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ()) |
| } finally { |
| rm0.shutdown() |
| rm1.shutdown() |
| } |
| |
| // verify that broker 1 did remove its metrics when no longer being the leader of partition 1 |
| verify(mockTopicStats1).removeOldLeaderMetrics(topic) |
| } |
| |
| @Test |
| def testOldFollowerLosesMetricsWhenReassignPartitions(): Unit = { |
| val controllerEpoch = 0 |
| val leaderEpoch = 0 |
| val leaderEpochIncrement = 1 |
| val correlationId = 0 |
| val controllerId = 0 |
| val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats]) |
| val (rm0, rm1) = prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), mockTopicStats1) |
| |
| try { |
| // make broker 0 the leader of partition 0 and |
| // make broker 1 the leader of partition 1 |
| val tp0 = new TopicPartition(topic, 0) |
| val tp1 = new TopicPartition(topic, 1) |
| val partition0Replicas = Seq[Integer](1, 0).asJava |
| val partition1Replicas = Seq[Integer](1, 0).asJava |
| val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava |
| |
| val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, |
| controllerId, 0, brokerEpoch, |
| Seq( |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(1) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true), |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp1.topic) |
| .setPartitionIndex(tp1.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(1) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(partition1Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition1Replicas) |
| .setIsNew(true) |
| ).asJava, |
| topicIds, |
| Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() |
| |
| rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ()) |
| rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ()) |
| |
| // make broker 0 the leader of partition 1 so broker 1 loses its leadership position |
| val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, |
| controllerEpoch, brokerEpoch, |
| Seq( |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp0.topic) |
| .setPartitionIndex(tp0.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch + leaderEpochIncrement) |
| .setIsr(partition0Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition0Replicas) |
| .setIsNew(true), |
| new LeaderAndIsrPartitionState() |
| .setTopicName(tp1.topic) |
| .setPartitionIndex(tp1.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch + leaderEpochIncrement) |
| .setIsr(partition1Replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(partition1Replicas) |
| .setIsNew(true) |
| ).asJava, |
| topicIds, |
| Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build() |
| |
| rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ()) |
| rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ()) |
| } finally { |
| rm0.shutdown() |
| rm1.shutdown() |
| } |
| |
| // verify that broker 1 did remove its metrics when no longer being the leader of partition 1 |
| verify(mockTopicStats1).removeOldLeaderMetrics(topic) |
| verify(mockTopicStats1).removeOldFollowerMetrics(topic) |
| } |
| |
| private def prepareDifferentReplicaManagers(brokerTopicStats1: BrokerTopicStats, |
| brokerTopicStats2: BrokerTopicStats): (ReplicaManager, ReplicaManager) = { |
| val props0 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) |
| val props1 = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) |
| |
| props0.put("log0.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) |
| props1.put("log1.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) |
| |
| val config0 = KafkaConfig.fromProps(props0) |
| val config1 = KafkaConfig.fromProps(props1) |
| |
| val mockLogMgr0 = TestUtils.createLogManager(config0.logDirs.map(new File(_))) |
| val mockLogMgr1 = TestUtils.createLogManager(config1.logDirs.map(new File(_))) |
| |
| val metadataCache0: MetadataCache = mock(classOf[MetadataCache]) |
| val metadataCache1: MetadataCache = mock(classOf[MetadataCache]) |
| val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1)) |
| mockGetAliveBrokerFunctions(metadataCache0, aliveBrokers) |
| mockGetAliveBrokerFunctions(metadataCache1, aliveBrokers) |
| when(metadataCache0.metadataVersion()).thenReturn(config0.interBrokerProtocolVersion) |
| when(metadataCache1.metadataVersion()).thenReturn(config1.interBrokerProtocolVersion) |
| |
| // each replica manager is for a broker |
| val rm0 = new ReplicaManager( |
| metrics = metrics, |
| config = config0, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr0, |
| quotaManagers = quotaManager, |
| brokerTopicStats = brokerTopicStats1, |
| metadataCache = metadataCache0, |
| logDirFailureChannel = new LogDirFailureChannel(config0.logDirs.size), |
| alterPartitionManager = alterPartitionManager) |
| val rm1 = new ReplicaManager( |
| metrics = metrics, |
| config = config1, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr1, |
| quotaManagers = quotaManager, |
| brokerTopicStats = brokerTopicStats2, |
| metadataCache = metadataCache1, |
| logDirFailureChannel = new LogDirFailureChannel(config1.logDirs.size), |
| alterPartitionManager = alterPartitionManager) |
| |
| (rm0, rm1) |
| } |
| |
| @Test |
| def testStopReplicaWithStaleControllerEpoch(): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 10, brokerEpoch, |
| Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava, |
| Collections.singletonMap(topic, Uuid.randomUuid()), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava |
| ).build() |
| |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| val partitionStates = Map(tp0 -> new StopReplicaPartitionState() |
| .setPartitionIndex(tp0.partition) |
| .setLeaderEpoch(1) |
| .setDeletePartition(false) |
| ) |
| |
| val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates) |
| assertEquals(Errors.STALE_CONTROLLER_EPOCH, error) |
| } |
| |
| @Test |
| def testStopReplicaWithOfflinePartition(): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava, |
| Collections.singletonMap(topic, Uuid.randomUuid()), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava |
| ).build() |
| |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| replicaManager.markPartitionOffline(tp0) |
| |
| val partitionStates = Map(tp0 -> new StopReplicaPartitionState() |
| .setPartitionIndex(tp0.partition) |
| .setLeaderEpoch(1) |
| .setDeletePartition(false) |
| ) |
| |
| val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result) |
| } |
| |
| @Test |
| def testStopReplicaWithInexistentPartition(): Unit = { |
| testStopReplicaWithInexistentPartition(false, false) |
| } |
| |
| @Test |
| def testStopReplicaWithInexistentPartitionAndPartitionsDelete(): Unit = { |
| testStopReplicaWithInexistentPartition(true, false) |
| } |
| |
| @Test |
| def testStopReplicaWithInexistentPartitionAndPartitionsDeleteAndIOException(): Unit = { |
| testStopReplicaWithInexistentPartition(true, true) |
| } |
| |
| private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean, throwIOException: Boolean): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val log = replicaManager.logManager.getOrCreateLog(tp0, true, topicId = None) |
| |
| if (throwIOException) { |
| // Delete the underlying directory to trigger an KafkaStorageException |
| val dir = log.dir.getParentFile |
| Utils.delete(dir) |
| Files.createFile(dir.toPath) |
| } |
| |
| val partitionStates = Map(tp0 -> new StopReplicaPartitionState() |
| .setPartitionIndex(tp0.partition) |
| .setLeaderEpoch(1) |
| .setDeletePartition(deletePartitions) |
| ) |
| |
| val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates) |
| assertEquals(Errors.NONE, error) |
| |
| if (throwIOException && deletePartitions) { |
| assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result) |
| assertTrue(replicaManager.logManager.getLog(tp0).isEmpty) |
| } else if (deletePartitions) { |
| assertEquals(Map(tp0 -> Errors.NONE), result) |
| assertTrue(replicaManager.logManager.getLog(tp0).isEmpty) |
| } else { |
| assertEquals(Map(tp0 -> Errors.NONE), result) |
| assertTrue(replicaManager.logManager.getLog(tp0).isDefined) |
| } |
| } |
| |
| @Test |
| def testStopReplicaWithExistingPartitionAndNewerLeaderEpoch(): Unit = { |
| testStopReplicaWithExistingPartition(2, false, false, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaWithExistingPartitionAndOlderLeaderEpoch(): Unit = { |
| testStopReplicaWithExistingPartition(0, false, false, Errors.FENCED_LEADER_EPOCH) |
| } |
| |
| @Test |
| def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(): Unit = { |
| testStopReplicaWithExistingPartition(1, false, false, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaWithExistingPartitionAndDeleteSentinel(): Unit = { |
| testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, false, false, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaWithExistingPartitionAndLeaderEpochNotProvided(): Unit = { |
| testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, false, false, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpoch(): Unit = { |
| testStopReplicaWithExistingPartition(2, true, false, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException(): Unit = { |
| testStopReplicaWithExistingPartition(2, true, true, Errors.KAFKA_STORAGE_ERROR) |
| } |
| |
| @Test |
| def testStopReplicaWithDeletePartitionAndExistingPartitionAndOlderLeaderEpoch(): Unit = { |
| testStopReplicaWithExistingPartition(0, true, false, Errors.FENCED_LEADER_EPOCH) |
| } |
| |
| @Test |
| def testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(): Unit = { |
| testStopReplicaWithExistingPartition(1, true, false, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaWithDeletePartitionAndExistingPartitionAndDeleteSentinel(): Unit = { |
| testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, true, false, Errors.NONE) |
| } |
| |
| @Test |
| def testStopReplicaWithDeletePartitionAndExistingPartitionAndLeaderEpochNotProvided(): Unit = { |
| testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, Errors.NONE) |
| } |
| |
| private def testStopReplicaWithExistingPartition(leaderEpoch: Int, |
| deletePartition: Boolean, |
| throwIOException: Boolean, |
| expectedOutput: Errors): Unit = { |
| val mockTimer = new MockTimer(time) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) |
| |
| val tp0 = new TopicPartition(topic, 0) |
| val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) |
| val partition = replicaManager.createPartition(tp0) |
| partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) |
| |
| val logDirFailureChannel = new LogDirFailureChannel(replicaManager.config.logDirs.size) |
| val logDir = partition.log.get.parentDirFile |
| |
| def readRecoveryPointCheckpoint(): Map[TopicPartition, Long] = { |
| new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile), |
| logDirFailureChannel).read() |
| } |
| |
| def readLogStartOffsetCheckpoint(): Map[TopicPartition, Long] = { |
| new OffsetCheckpointFile(new File(logDir, LogManager.LogStartOffsetCheckpointFile), |
| logDirFailureChannel).read() |
| } |
| |
| val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(leaderAndIsrPartitionState(tp0, 1, 0, Seq(0, 1), true)).asJava, |
| Collections.singletonMap(tp0.topic(), Uuid.randomUuid()), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava |
| ).build() |
| |
| replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) |
| |
| val batch = TestUtils.records(records = List( |
| new SimpleRecord(10, "k1".getBytes, "v1".getBytes), |
| new SimpleRecord(11, "k2".getBytes, "v2".getBytes))) |
| partition.appendRecordsToLeader(batch, AppendOrigin.CLIENT, requiredAcks = 0, RequestLocal.withThreadConfinedCaching) |
| partition.log.get.updateHighWatermark(2L) |
| partition.log.get.maybeIncrementLogStartOffset(1L, LogStartOffsetIncrementReason.LeaderOffsetIncremented) |
| replicaManager.logManager.checkpointLogRecoveryOffsets() |
| replicaManager.logManager.checkpointLogStartOffsets() |
| assertEquals(Some(1L), readRecoveryPointCheckpoint().get(tp0)) |
| assertEquals(Some(1L), readLogStartOffsetCheckpoint().get(tp0)) |
| |
| if (throwIOException) { |
| // Replace underlying PartitionMetadataFile with a mock which throws |
| // a KafkaStorageException when maybeFlush is called. |
| val mockPartitionMetadataFile = mock(classOf[PartitionMetadataFile]) |
| when(mockPartitionMetadataFile.maybeFlush()).thenThrow(new KafkaStorageException()) |
| partition.log.get.partitionMetadataFile = Some(mockPartitionMetadataFile) |
| } |
| |
| val partitionStates = Map(tp0 -> new StopReplicaPartitionState() |
| .setPartitionIndex(tp0.partition) |
| .setLeaderEpoch(leaderEpoch) |
| .setDeletePartition(deletePartition) |
| ) |
| |
| val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates) |
| assertEquals(Errors.NONE, error) |
| assertEquals(Map(tp0 -> expectedOutput), result) |
| |
| if (expectedOutput == Errors.NONE && deletePartition) { |
| assertEquals(HostedPartition.None, replicaManager.getPartition(tp0)) |
| assertFalse(readRecoveryPointCheckpoint().contains(tp0)) |
| assertFalse(readLogStartOffsetCheckpoint().contains(tp0)) |
| } |
| } |
| |
| @Test |
| def testReplicaNotAvailable(): Unit = { |
| |
| def createReplicaManager(): ReplicaManager = { |
| val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) |
| val config = KafkaConfig.fromProps(props) |
| val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) |
| new ReplicaManager( |
| metrics = metrics, |
| config = config, |
| time = time, |
| scheduler = new MockScheduler(time), |
| logManager = mockLogMgr, |
| quotaManagers = quotaManager, |
| metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), |
| logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), |
| alterPartitionManager = alterPartitionManager) { |
| override def getPartitionOrException(topicPartition: TopicPartition): Partition = { |
| throw Errors.NOT_LEADER_OR_FOLLOWER.exception() |
| } |
| } |
| } |
| |
| val replicaManager = createReplicaManager() |
| try { |
| val tp = new TopicPartition(topic, 0) |
| val dir = replicaManager.logManager.liveLogDirs.head.getAbsolutePath |
| val errors = replicaManager.alterReplicaLogDirs(Map(tp -> dir)) |
| assertEquals(Errors.REPLICA_NOT_AVAILABLE, errors(tp)) |
| } finally { |
| replicaManager.shutdown(false) |
| } |
| } |
| |
| @Test |
| def testPartitionMetadataFile(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val topicPartition = new TopicPartition(topic, 0) |
| val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) |
| val topicNames = topicIds.asScala.map(_.swap).asJava |
| |
| def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest = |
| new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(epoch) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| |
| val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ()) |
| assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) |
| assertFalse(replicaManager.localLog(topicPartition).isEmpty) |
| val id = topicIds.get(topicPartition.topic()) |
| val log = replicaManager.localLog(topicPartition).get |
| assertTrue(log.partitionMetadataFile.get.exists()) |
| val partitionMetadata = log.partitionMetadataFile.get.read() |
| |
| // Current version of PartitionMetadataFile is 0. |
| assertEquals(0, partitionMetadata.version) |
| assertEquals(id, partitionMetadata.topicId) |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @Test |
| def testPartitionMetadataFileCreatedWithExistingLog(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val topicPartition = new TopicPartition(topic, 0) |
| |
| replicaManager.logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None) |
| |
| assertTrue(replicaManager.getLog(topicPartition).isDefined) |
| var log = replicaManager.getLog(topicPartition).get |
| assertEquals(None, log.topicId) |
| assertFalse(log.partitionMetadataFile.get.exists()) |
| |
| val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) |
| val topicNames = topicIds.asScala.map(_.swap).asJava |
| |
| def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(epoch) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| |
| val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ()) |
| assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) |
| assertFalse(replicaManager.localLog(topicPartition).isEmpty) |
| val id = topicIds.get(topicPartition.topic()) |
| log = replicaManager.localLog(topicPartition).get |
| assertTrue(log.partitionMetadataFile.get.exists()) |
| val partitionMetadata = log.partitionMetadataFile.get.read() |
| |
| // Current version of PartitionMetadataFile is 0. |
| assertEquals(0, partitionMetadata.version) |
| assertEquals(id, partitionMetadata.topicId) |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @Test |
| def testPartitionMetadataFileCreatedAfterPreviousRequestWithoutIds(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val topicPartition = new TopicPartition(topic, 0) |
| val topicPartition2 = new TopicPartition(topic, 1) |
| |
| def leaderAndIsrRequest(topicIds: util.Map[String, Uuid], version: Short, partition: Int = 0, leaderEpoch: Int = 0): LeaderAndIsrRequest = |
| new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(partition) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| |
| // Send a request without a topic ID so that we have a log without a topic ID associated to the partition. |
| val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(Collections.emptyMap(), 4), (_, _) => ()) |
| assertEquals(Errors.NONE, response.partitionErrors(Collections.emptyMap()).get(topicPartition)) |
| assertTrue(replicaManager.localLog(topicPartition).isDefined) |
| val log = replicaManager.localLog(topicPartition).get |
| assertFalse(log.partitionMetadataFile.get.exists()) |
| assertTrue(log.topicId.isEmpty) |
| |
| val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(topicIds.asJava, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ()) |
| assertEquals(Errors.NONE, response2.partitionErrors(topicNames.asJava).get(topicPartition)) |
| assertTrue(replicaManager.localLog(topicPartition).isDefined) |
| assertTrue(log.partitionMetadataFile.get.exists()) |
| assertTrue(log.topicId.isDefined) |
| assertEquals(topicId, log.topicId.get) |
| |
| // Repeat with partition 2, but in this case, update the leader epoch |
| // Send a request without a topic ID so that we have a log without a topic ID associated to the partition. |
| val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(Collections.emptyMap(), 4, 1), (_, _) => ()) |
| assertEquals(Errors.NONE, response3.partitionErrors(Collections.emptyMap()).get(topicPartition2)) |
| assertTrue(replicaManager.localLog(topicPartition2).isDefined) |
| val log2 = replicaManager.localLog(topicPartition2).get |
| assertFalse(log2.partitionMetadataFile.get.exists()) |
| assertTrue(log2.topicId.isEmpty) |
| |
| val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(topicIds.asJava, ApiKeys.LEADER_AND_ISR.latestVersion, 1, 1), (_, _) => ()) |
| assertEquals(Errors.NONE, response4.partitionErrors(topicNames.asJava).get(topicPartition2)) |
| assertTrue(replicaManager.localLog(topicPartition2).isDefined) |
| assertTrue(log2.partitionMetadataFile.get.exists()) |
| assertTrue(log2.topicId.isDefined) |
| assertEquals(topicId, log2.topicId.get) |
| |
| assertEquals(topicId, log.partitionMetadataFile.get.read().topicId) |
| assertEquals(topicId, log2.partitionMetadataFile.get.read().topicId) |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @Test |
| def testInconsistentIdReturnsError(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val topicPartition = new TopicPartition(topic, 0) |
| val topicIds = Collections.singletonMap(topic, Uuid.randomUuid()) |
| val topicNames = topicIds.asScala.map(_.swap).asJava |
| |
| val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid()) |
| val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava |
| |
| def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest = |
| new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(epoch) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| |
| val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ()) |
| assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) |
| |
| val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ()) |
| assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition)) |
| |
| // Send request with inconsistent ID. |
| val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ()) |
| assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition)) |
| |
| val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(2, invalidTopicIds), (_, _) => ()) |
| assertEquals(Errors.INCONSISTENT_TOPIC_ID, response4.partitionErrors(invalidTopicNames).get(topicPartition)) |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @Test |
| def testPartitionMetadataFileNotCreated(): Unit = { |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) |
| try { |
| val brokerList = Seq[Integer](0, 1).asJava |
| val topicPartition = new TopicPartition(topic, 0) |
| val topicPartitionFoo = new TopicPartition("foo", 0) |
| val topicPartitionFake = new TopicPartition("fakeTopic", 0) |
| val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava |
| val topicNames = topicIds.asScala.map(_.swap).asJava |
| |
| def leaderAndIsrRequest(epoch: Int, name: String, version: Short): LeaderAndIsrRequest = LeaderAndIsrRequest.parse( |
| new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch, |
| Seq(new LeaderAndIsrPartitionState() |
| .setTopicName(name) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(epoch) |
| .setIsr(brokerList) |
| .setPartitionEpoch(0) |
| .setReplicas(brokerList) |
| .setIsNew(true)).asJava, |
| topicIds, |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version) |
| |
| // There is no file if the topic does not have an associated topic ID. |
| val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ()) |
| assertTrue(replicaManager.localLog(topicPartitionFake).isDefined) |
| val log = replicaManager.localLog(topicPartitionFake).get |
| assertFalse(log.partitionMetadataFile.get.exists()) |
| assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) |
| |
| // There is no file if the topic has the default UUID. |
| val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ()) |
| assertTrue(replicaManager.localLog(topicPartition).isDefined) |
| val log2 = replicaManager.localLog(topicPartition).get |
| assertFalse(log2.partitionMetadataFile.get.exists()) |
| assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition)) |
| |
| // There is no file if the request an older version |
| val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ()) |
| assertTrue(replicaManager.localLog(topicPartitionFoo).isDefined) |
| val log3 = replicaManager.localLog(topicPartitionFoo).get |
| assertFalse(log3.partitionMetadataFile.get.exists()) |
| assertEquals(Errors.NONE, response3.partitionErrors(topicNames).get(topicPartitionFoo)) |
| |
| // There is no file if the request is an older version |
| val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, "foo", 4), (_, _) => ()) |
| assertTrue(replicaManager.localLog(topicPartitionFoo).isDefined) |
| val log4 = replicaManager.localLog(topicPartitionFoo).get |
| assertFalse(log4.partitionMetadataFile.get.exists()) |
| assertEquals(Errors.NONE, response4.partitionErrors(topicNames).get(topicPartitionFoo)) |
| } finally replicaManager.shutdown(checkpointHW = false) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testPartitionMarkedOfflineIfLogCantBeCreated(becomeLeader: Boolean): Unit = { |
| val dataDir = TestUtils.tempDir() |
| val topicPartition = new TopicPartition(topic, 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories( |
| timer = new MockTimer(time), |
| propsModifier = props => props.put(KafkaConfig.LogDirsProp, dataDir.getAbsolutePath) |
| ) |
| |
| try { |
| // Delete the data directory to trigger a storage exception |
| Utils.delete(dataDir) |
| |
| val request = makeLeaderAndIsrRequest( |
| topicId = Uuid.randomUuid(), |
| topicPartition = topicPartition, |
| replicas = Seq(0, 1), |
| leaderAndIsr = LeaderAndIsr(if (becomeLeader) 0 else 1, List(0, 1)) |
| ) |
| |
| replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ()) |
| |
| assertEquals(HostedPartition.Offline, replicaManager.getPartition(topicPartition)) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| private def makeLeaderAndIsrRequest( |
| topicId: Uuid, |
| topicPartition: TopicPartition, |
| replicas: Seq[Int], |
| leaderAndIsr: LeaderAndIsr, |
| isNew: Boolean = true, |
| brokerEpoch: Int = 0, |
| controllerId: Int = 0, |
| controllerEpoch: Int = 0, |
| version: Short = LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION |
| ): LeaderAndIsrRequest = { |
| val partitionState = new LeaderAndIsrPartitionState() |
| .setTopicName(topicPartition.topic) |
| .setPartitionIndex(topicPartition.partition) |
| .setControllerEpoch(controllerEpoch) |
| .setLeader(leaderAndIsr.leader) |
| .setLeaderEpoch(leaderAndIsr.leaderEpoch) |
| .setIsr(leaderAndIsr.isr.map(Int.box).asJava) |
| .setPartitionEpoch(leaderAndIsr.partitionEpoch) |
| .setReplicas(replicas.map(Int.box).asJava) |
| .setIsNew(isNew) |
| |
| def mkNode(replicaId: Int): Node = { |
| new Node(replicaId, s"host-$replicaId", 9092) |
| } |
| |
| val nodes = Set(mkNode(controllerId)) ++ replicas.map(mkNode).toSet |
| |
| new LeaderAndIsrRequest.Builder( |
| version, |
| controllerId, |
| controllerEpoch, |
| brokerEpoch, |
| Seq(partitionState).asJava, |
| Map(topicPartition.topic -> topicId).asJava, |
| nodes.asJava |
| ).build() |
| } |
| |
| @Test |
| def testActiveProducerState(): Unit = { |
| val brokerId = 0 |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), brokerId) |
| try { |
| val fooPartition = new TopicPartition("foo", 0) |
| when(replicaManager.metadataCache.contains(fooPartition)).thenReturn(false) |
| val fooProducerState = replicaManager.activeProducerState(fooPartition) |
| assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forCode(fooProducerState.errorCode)) |
| |
| val oofPartition = new TopicPartition("oof", 0) |
| when(replicaManager.metadataCache.contains(oofPartition)).thenReturn(true) |
| val oofProducerState = replicaManager.activeProducerState(oofPartition) |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, Errors.forCode(oofProducerState.errorCode)) |
| |
| // This API is supported by both leaders and followers |
| |
| val barPartition = new TopicPartition("bar", 0) |
| val barLeaderAndIsrRequest = makeLeaderAndIsrRequest( |
| topicId = Uuid.randomUuid(), |
| topicPartition = barPartition, |
| replicas = Seq(brokerId), |
| leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId)) |
| ) |
| replicaManager.becomeLeaderOrFollower(0, barLeaderAndIsrRequest, (_, _) => ()) |
| val barProducerState = replicaManager.activeProducerState(barPartition) |
| assertEquals(Errors.NONE, Errors.forCode(barProducerState.errorCode)) |
| |
| val otherBrokerId = 1 |
| val bazPartition = new TopicPartition("baz", 0) |
| val bazLeaderAndIsrRequest = makeLeaderAndIsrRequest( |
| topicId = Uuid.randomUuid(), |
| topicPartition = bazPartition, |
| replicas = Seq(brokerId, otherBrokerId), |
| leaderAndIsr = LeaderAndIsr(otherBrokerId, List(brokerId, otherBrokerId)) |
| ) |
| replicaManager.becomeLeaderOrFollower(0, bazLeaderAndIsrRequest, (_, _) => ()) |
| val bazProducerState = replicaManager.activeProducerState(bazPartition) |
| assertEquals(Errors.NONE, Errors.forCode(bazProducerState.errorCode)) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| val FOO_UUID = Uuid.fromString("fFJBx0OmQG-UqeaT6YaSwA") |
| |
| val BAR_UUID = Uuid.fromString("vApAP6y7Qx23VOfKBzbOBQ") |
| |
| @Test |
| def testGetOrCreatePartition(): Unit = { |
| val brokerId = 0 |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), brokerId) |
| val foo0 = new TopicPartition("foo", 0) |
| val emptyDelta = new TopicsDelta(TopicsImage.EMPTY) |
| val (fooPart, fooNew) = replicaManager.getOrCreatePartition(foo0, emptyDelta, FOO_UUID).get |
| assertTrue(fooNew) |
| assertEquals(foo0, fooPart.topicPartition) |
| val (fooPart2, fooNew2) = replicaManager.getOrCreatePartition(foo0, emptyDelta, FOO_UUID).get |
| assertFalse(fooNew2) |
| assertTrue(fooPart eq fooPart2) |
| val bar1 = new TopicPartition("bar", 1) |
| replicaManager.markPartitionOffline(bar1) |
| assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID)) |
| } |
| |
| private def verifyRLMOnLeadershipChange(leaderPartitions: util.Set[Partition], followerPartitions: util.Set[Partition]): Unit = { |
| val leaderCapture: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) |
| val followerCapture: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) |
| val topicIdsCapture: ArgumentCaptor[util.Map[String, Uuid]] = ArgumentCaptor.forClass(classOf[util.Map[String, Uuid]]) |
| verify(mockRemoteLogManager).onLeadershipChange(leaderCapture.capture(), followerCapture.capture(), topicIdsCapture.capture()) |
| |
| val actualLeaderPartitions = leaderCapture.getValue |
| val actualFollowerPartitions = followerCapture.getValue |
| |
| assertEquals(leaderPartitions, actualLeaderPartitions) |
| assertEquals(followerPartitions, actualFollowerPartitions) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaFromLeaderToFollower(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val numOfRecords = 3 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the leader |
| val leaderTopicsDelta = topicsCreateDelta(localId, true) |
| val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) |
| val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id |
| val topicIdPartition = new TopicIdPartition(topicId, topicPartition) |
| |
| replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) |
| assertTrue(leaderPartition.isLeader) |
| assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) |
| assertEquals(0, leaderPartition.getLeaderEpoch) |
| |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.singleton(leaderPartition), Collections.emptySet()) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Send a produce request and advance the highwatermark |
| val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) |
| fetchPartitionAsFollower( |
| replicaManager, |
| topicIdPartition, |
| new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, Optional.empty()), |
| replicaId = otherId |
| ) |
| assertEquals(Errors.NONE, leaderResponse.get.error) |
| |
| // Change the local replica to follower |
| val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false) |
| val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Append on a follower should fail |
| val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(1, followerPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| } |
| |
| val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) |
| assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.leader.brokerEndPoint())) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaFromFollowerToLeader(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val numOfRecords = 3 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the follower |
| val followerTopicsDelta = topicsCreateDelta(localId, false) |
| val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) |
| assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.leader.brokerEndPoint())) |
| |
| // Append on a follower should fail |
| val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error) |
| |
| // Change the local replica to leader |
| val leaderTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, true) |
| val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) |
| val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id |
| val topicIdPartition = new TopicIdPartition(topicId, topicPartition) |
| replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) |
| |
| // Send a produce request and advance the highwatermark |
| val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) |
| fetchPartitionAsFollower( |
| replicaManager, |
| topicIdPartition, |
| new PartitionData(Uuid.ZERO_UUID, numOfRecords, 0, Int.MaxValue, Optional.empty()), |
| replicaId = otherId |
| ) |
| assertEquals(Errors.NONE, leaderResponse.get.error) |
| |
| val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) |
| assertTrue(leaderPartition.isLeader) |
| assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) |
| assertEquals(1, leaderPartition.getLeaderEpoch) |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.singleton(leaderPartition), Collections.emptySet()) |
| } |
| |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaFollowerWithNoChange(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the follower |
| val followerTopicsDelta = topicsCreateDelta(localId, false) |
| val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) |
| assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.leader.brokerEndPoint())) |
| |
| // Apply the same delta again |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Check that the state stays the same |
| val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(noChangePartition.isLeader) |
| assertEquals(0, noChangePartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| } |
| |
| val noChangeFetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) |
| assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), noChangeFetcher.map(_.leader.brokerEndPoint())) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaFollowerToNotReplica(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the follower |
| val followerTopicsDelta = topicsCreateDelta(localId, false) |
| val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) |
| assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.leader.brokerEndPoint())) |
| |
| // Apply changes that remove replica |
| val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true) |
| val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply()) |
| replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage) |
| |
| verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap()) |
| |
| // Check that the partition was removed |
| assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| assertEquals(None, replicaManager.logManager.getLog(topicPartition)) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaFollowerRemovedTopic(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the follower |
| val followerTopicsDelta = topicsCreateDelta(localId, false) |
| val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) |
| assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.leader.brokerEndPoint())) |
| |
| // Apply changes that remove topic and replica |
| val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics()) |
| val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply()) |
| replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage) |
| verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap()) |
| |
| // Check that the partition was removed |
| assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| assertEquals(None, replicaManager.logManager.getLog(topicPartition)) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaLeaderToNotReplica(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the leader |
| val leaderTopicsDelta = topicsCreateDelta(localId, true) |
| val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) |
| replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) |
| assertTrue(leaderPartition.isLeader) |
| assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) |
| assertEquals(0, leaderPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.singleton(leaderPartition), Collections.emptySet()) |
| reset(mockRemoteLogManager) |
| } |
| |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| |
| // Apply changes that remove replica |
| val notReplicaTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), otherId, true) |
| val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply()) |
| replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage) |
| verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap()) |
| |
| // Check that the partition was removed |
| assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| assertEquals(None, replicaManager.logManager.getLog(topicPartition)) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaLeaderToRemovedTopic(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the leader |
| val leaderTopicsDelta = topicsCreateDelta(localId, true) |
| val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) |
| replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) |
| assertTrue(leaderPartition.isLeader) |
| assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) |
| assertEquals(0, leaderPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.singleton(leaderPartition), Collections.emptySet()) |
| reset(mockRemoteLogManager) |
| } |
| |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| |
| // Apply changes that remove topic and replica |
| val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics()) |
| val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply()) |
| replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage) |
| verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap()) |
| |
| // Check that the partition was removed |
| assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| assertEquals(None, replicaManager.logManager.getLog(topicPartition)) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaToFollowerCompletesProduce(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val numOfRecords = 3 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the leader |
| val leaderTopicsDelta = topicsCreateDelta(localId, true) |
| val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) |
| replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) |
| assertTrue(leaderPartition.isLeader) |
| assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) |
| assertEquals(0, leaderPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.singleton(leaderPartition), Collections.emptySet()) |
| reset(mockRemoteLogManager) |
| } |
| |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| |
| // Send a produce request |
| val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) |
| |
| // Change the local replica to follower |
| val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false) |
| val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(1, followerPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Check that the produce failed because it changed to follower before replicating |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaToFollowerCompletesFetch(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId, enableRemoteStorage = enableRemoteStorage) |
| |
| try { |
| // Make the local replica the leader |
| val leaderTopicsDelta = topicsCreateDelta(localId, true) |
| val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) |
| val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id |
| val topicIdPartition = new TopicIdPartition(topicId, topicPartition) |
| replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) |
| |
| // Check the state of that partition and fetcher |
| val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) |
| assertTrue(leaderPartition.isLeader) |
| assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) |
| assertEquals(0, leaderPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.singleton(leaderPartition), Collections.emptySet()) |
| reset(mockRemoteLogManager) |
| } |
| |
| assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) |
| |
| // Send a fetch request |
| val fetchCallback = fetchPartitionAsFollower( |
| replicaManager, |
| topicIdPartition, |
| new PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, Optional.empty()), |
| replicaId = otherId, |
| minBytes = Int.MaxValue, |
| maxWaitMs = 1000 |
| ) |
| assertFalse(fetchCallback.hasFired) |
| |
| // Change the local replica to follower |
| val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false) |
| val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(1, followerPartition.getLeaderEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Check that the produce failed because it changed to follower before replicating |
| assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaToLeaderOrFollowerMarksPartitionOfflineIfLogCantBeCreated(isStartIdLeader: Boolean): Unit = { |
| val localId = 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| val dataDir = TestUtils.tempDir() |
| val replicaManager = setupReplicaManagerWithMockedPurgatories( |
| timer = new MockTimer(time), |
| brokerId = localId, |
| propsModifier = props => props.put(KafkaConfig.LogDirsProp, dataDir.getAbsolutePath), |
| enableRemoteStorage = true |
| ) |
| |
| try { |
| // Delete the data directory to trigger a storage exception |
| Utils.delete(dataDir) |
| |
| // Make the local replica the leader |
| val topicsDelta = topicsCreateDelta(localId, isStartIdLeader) |
| val leaderMetadataImage = imageFromTopics(topicsDelta.apply()) |
| replicaManager.applyDelta(topicsDelta, leaderMetadataImage) |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.emptySet()) |
| |
| assertEquals(HostedPartition.Offline, replicaManager.getPartition(topicPartition)) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(enableRemoteStorage: Boolean): Unit = { |
| val localId = 1 |
| val otherId = localId + 1 |
| val topicPartition = new TopicPartition("foo", 0) |
| |
| val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories( |
| timer = new MockTimer(time), |
| brokerId = localId, |
| mockReplicaFetcherManager = Some(mockReplicaFetcherManager), |
| enableRemoteStorage = enableRemoteStorage |
| ) |
| |
| try { |
| // The first call to removeFetcherForPartitions should be ignored. |
| when(mockReplicaFetcherManager.removeFetcherForPartitions( |
| Set(topicPartition)) |
| ).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) |
| |
| // Make the local replica the follower |
| var followerTopicsDelta = topicsCreateDelta(localId, false) |
| var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Check the state of that partition |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| assertEquals(0, followerPartition.localLogOrException.logEndOffset) |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Verify that addFetcherForPartitions was called with the correct |
| // init offset. |
| verify(mockReplicaFetcherManager) |
| .addFetcherForPartitions( |
| Map(topicPartition -> InitialFetchState( |
| topicId = Some(FOO_UUID), |
| leader = BrokerEndPoint(otherId, "localhost", 9093), |
| currentLeaderEpoch = 0, |
| initOffset = 0 |
| )) |
| ) |
| |
| // The second call to removeFetcherForPartitions simulate the case |
| // where the fetcher write to the log before being shutdown. |
| when(mockReplicaFetcherManager.removeFetcherForPartitions( |
| Set(topicPartition)) |
| ).thenAnswer { _ => |
| replicaManager.getPartition(topicPartition) match { |
| case HostedPartition.Online(partition) => |
| partition.appendRecordsToFollowerOrFutureReplica( |
| records = MemoryRecords.withRecords(CompressionType.NONE, 0, |
| new SimpleRecord("first message".getBytes)), |
| isFuture = false |
| ) |
| |
| case _ => |
| } |
| |
| Map.empty[TopicPartition, PartitionFetchState] |
| } |
| |
| // Apply changes that bumps the leader epoch. |
| followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false) |
| followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| assertFalse(followerPartition.isLeader) |
| assertEquals(1, followerPartition.getLeaderEpoch) |
| assertEquals(1, followerPartition.localLogOrException.logEndOffset) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| } |
| |
| // Verify that addFetcherForPartitions was called with the correct |
| // init offset. |
| verify(mockReplicaFetcherManager) |
| .addFetcherForPartitions( |
| Map(topicPartition -> InitialFetchState( |
| topicId = Some(FOO_UUID), |
| leader = BrokerEndPoint(otherId, "localhost", 9093), |
| currentLeaderEpoch = 1, |
| initOffset = 1 |
| )) |
| ) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithZkPath(): Unit = { |
| val localId = 0 |
| val topicPartition = new TopicPartition("foo", 0) |
| |
| val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories( |
| timer = new MockTimer(time), |
| brokerId = localId, |
| aliveBrokerIds = Seq(localId, localId + 1, localId + 2), |
| mockReplicaFetcherManager = Some(mockReplicaFetcherManager) |
| ) |
| |
| try { |
| when(mockReplicaFetcherManager.removeFetcherForPartitions( |
| Set(topicPartition)) |
| ).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) |
| |
| // Make the local replica the follower. |
| var request = makeLeaderAndIsrRequest( |
| topicId = FOO_UUID, |
| topicPartition = topicPartition, |
| replicas = Seq(localId, localId + 1), |
| leaderAndIsr = LeaderAndIsr( |
| leader = localId + 1, |
| leaderEpoch = 0, |
| isr = List(localId, localId + 1), |
| leaderRecoveryState = LeaderRecoveryState.RECOVERED, |
| partitionEpoch = 0 |
| ) |
| ) |
| |
| replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ()) |
| |
| // Check the state of that partition. |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| assertEquals(0, followerPartition.getPartitionEpoch) |
| |
| // Verify that the partition was removed and added back. |
| verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition)) |
| verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> InitialFetchState( |
| topicId = Some(FOO_UUID), |
| leader = BrokerEndPoint(localId + 1, s"host${localId + 1}", localId + 1), |
| currentLeaderEpoch = 0, |
| initOffset = 0 |
| ))) |
| |
| reset(mockReplicaFetcherManager) |
| |
| // Apply changes that bumps the partition epoch. |
| request = makeLeaderAndIsrRequest( |
| topicId = FOO_UUID, |
| topicPartition = topicPartition, |
| replicas = Seq(localId, localId + 1, localId + 2), |
| leaderAndIsr = LeaderAndIsr( |
| leader = localId + 1, |
| leaderEpoch = 0, |
| isr = List(localId, localId + 1), |
| leaderRecoveryState = LeaderRecoveryState.RECOVERED, |
| partitionEpoch = 1 |
| ) |
| ) |
| |
| replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ()) |
| |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| // Partition updates is fenced based on the leader epoch on the ZK path. |
| assertEquals(0, followerPartition.getPartitionEpoch) |
| |
| // As the update is fenced based on the leader epoch, removeFetcherForPartitions and |
| // addFetcherForPartitions are not called at all. |
| reset(mockReplicaFetcherManager) |
| |
| // Apply changes that bumps the leader epoch. |
| request = makeLeaderAndIsrRequest( |
| topicId = FOO_UUID, |
| topicPartition = topicPartition, |
| replicas = Seq(localId, localId + 1, localId + 2), |
| leaderAndIsr = LeaderAndIsr( |
| leader = localId + 2, |
| leaderEpoch = 1, |
| isr = List(localId, localId + 1, localId + 2), |
| leaderRecoveryState = LeaderRecoveryState.RECOVERED, |
| partitionEpoch = 2 |
| ) |
| ) |
| |
| replicaManager.becomeLeaderOrFollower(0, request, (_, _) => ()) |
| |
| assertFalse(followerPartition.isLeader) |
| assertEquals(1, followerPartition.getLeaderEpoch) |
| assertEquals(2, followerPartition.getPartitionEpoch) |
| |
| // Verify that the partition was removed and added back. |
| verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition)) |
| verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> InitialFetchState( |
| topicId = Some(FOO_UUID), |
| leader = BrokerEndPoint(localId + 2, s"host${localId + 2}", localId + 2), |
| currentLeaderEpoch = 1, |
| initOffset = 0 |
| ))) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testFetcherAreNotRestartedIfLeaderEpochIsNotBumpedWithKRaftPath(enableRemoteStorage: Boolean): Unit = { |
| val localId = 0 |
| val topicPartition = new TopicPartition("foo", 0) |
| |
| val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories( |
| timer = new MockTimer(time), |
| brokerId = localId, |
| mockReplicaFetcherManager = Some(mockReplicaFetcherManager), |
| enableRemoteStorage = enableRemoteStorage |
| ) |
| |
| try { |
| when(mockReplicaFetcherManager.removeFetcherForPartitions( |
| Set(topicPartition)) |
| ).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) |
| |
| // Make the local replica the follower. |
| var followerTopicsDelta = new TopicsDelta(TopicsImage.EMPTY) |
| followerTopicsDelta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) |
| followerTopicsDelta.replay(new PartitionRecord() |
| .setPartitionId(0) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1)) |
| .setIsr(util.Arrays.asList(localId, localId + 1)) |
| .setRemovingReplicas(Collections.emptyList()) |
| .setAddingReplicas(Collections.emptyList()) |
| .setLeader(localId + 1) |
| .setLeaderEpoch(0) |
| .setPartitionEpoch(0) |
| ) |
| var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| // Check the state of that partition. |
| val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| assertEquals(0, followerPartition.getPartitionEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Verify that the partition was removed and added back. |
| verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition)) |
| verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> InitialFetchState( |
| topicId = Some(FOO_UUID), |
| leader = BrokerEndPoint(localId + 1, "localhost", 9093), |
| currentLeaderEpoch = 0, |
| initOffset = 0 |
| ))) |
| |
| reset(mockReplicaFetcherManager) |
| |
| // Apply changes that bumps the partition epoch. |
| followerTopicsDelta = new TopicsDelta(followerMetadataImage.topics()) |
| followerTopicsDelta.replay(new PartitionChangeRecord() |
| .setPartitionId(0) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2)) |
| .setIsr(util.Arrays.asList(localId, localId + 1)) |
| ) |
| followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| assertFalse(followerPartition.isLeader) |
| assertEquals(0, followerPartition.getLeaderEpoch) |
| assertEquals(1, followerPartition.getPartitionEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Verify that partition's fetcher was not impacted. |
| verify(mockReplicaFetcherManager, never()).removeFetcherForPartitions(any()) |
| verify(mockReplicaFetcherManager, never()).addFetcherForPartitions(any()) |
| |
| reset(mockReplicaFetcherManager) |
| |
| // Apply changes that bumps the leader epoch. |
| followerTopicsDelta = new TopicsDelta(followerMetadataImage.topics()) |
| followerTopicsDelta.replay(new PartitionChangeRecord() |
| .setPartitionId(0) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1, localId + 2)) |
| .setIsr(util.Arrays.asList(localId, localId + 1, localId + 2)) |
| .setLeader(localId + 2) |
| ) |
| |
| followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) |
| replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) |
| |
| assertFalse(followerPartition.isLeader) |
| assertEquals(1, followerPartition.getLeaderEpoch) |
| assertEquals(2, followerPartition.getPartitionEpoch) |
| |
| if (enableRemoteStorage) { |
| verifyRLMOnLeadershipChange(Collections.emptySet(), Collections.singleton(followerPartition)) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Verify that the partition was removed and added back. |
| verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(topicPartition)) |
| verify(mockReplicaFetcherManager).addFetcherForPartitions(Map(topicPartition -> InitialFetchState( |
| topicId = Some(FOO_UUID), |
| leader = BrokerEndPoint(localId + 2, "localhost", 9093), |
| currentLeaderEpoch = 1, |
| initOffset = 0 |
| ))) |
| } finally { |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testReplicasAreStoppedWhileInControlledShutdownWithKRaft(enableRemoteStorage: Boolean): Unit = { |
| val localId = 0 |
| val foo0 = new TopicPartition("foo", 0) |
| val foo1 = new TopicPartition("foo", 1) |
| val foo2 = new TopicPartition("foo", 2) |
| |
| val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) |
| val isShuttingDown = new AtomicBoolean(false) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories( |
| timer = new MockTimer(time), |
| brokerId = localId, |
| mockReplicaFetcherManager = Some(mockReplicaFetcherManager), |
| isShuttingDown = isShuttingDown, |
| enableRemoteStorage = enableRemoteStorage |
| ) |
| |
| try { |
| when(mockReplicaFetcherManager.removeFetcherForPartitions( |
| Set(foo0, foo1)) |
| ).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) |
| |
| var topicsDelta = new TopicsDelta(TopicsImage.EMPTY) |
| topicsDelta.replay(new TopicRecord() |
| .setName("foo") |
| .setTopicId(FOO_UUID) |
| ) |
| |
| // foo0 is a follower in the ISR. |
| topicsDelta.replay(new PartitionRecord() |
| .setPartitionId(0) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1)) |
| .setIsr(util.Arrays.asList(localId, localId + 1)) |
| .setLeader(localId + 1) |
| .setLeaderEpoch(0) |
| .setPartitionEpoch(0) |
| ) |
| |
| // foo1 is a leader with only himself in the ISR. |
| topicsDelta.replay(new PartitionRecord() |
| .setPartitionId(1) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1)) |
| .setIsr(util.Arrays.asList(localId)) |
| .setLeader(localId) |
| .setLeaderEpoch(0) |
| .setPartitionEpoch(0) |
| ) |
| |
| // foo2 is a follower NOT in the ISR. |
| topicsDelta.replay(new PartitionRecord() |
| .setPartitionId(2) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1)) |
| .setIsr(util.Arrays.asList(localId + 1)) |
| .setLeader(localId + 1) |
| .setLeaderEpoch(0) |
| .setPartitionEpoch(0) |
| ) |
| |
| // Apply the delta. |
| var metadataImage = imageFromTopics(topicsDelta.apply()) |
| replicaManager.applyDelta(topicsDelta, metadataImage) |
| |
| // Check the state of the partitions. |
| val HostedPartition.Online(fooPartition0) = replicaManager.getPartition(foo0) |
| assertFalse(fooPartition0.isLeader) |
| assertEquals(0, fooPartition0.getLeaderEpoch) |
| assertEquals(0, fooPartition0.getPartitionEpoch) |
| |
| val HostedPartition.Online(fooPartition1) = replicaManager.getPartition(foo1) |
| assertTrue(fooPartition1.isLeader) |
| assertEquals(0, fooPartition1.getLeaderEpoch) |
| assertEquals(0, fooPartition1.getPartitionEpoch) |
| |
| val HostedPartition.Online(fooPartition2) = replicaManager.getPartition(foo2) |
| assertFalse(fooPartition2.isLeader) |
| assertEquals(0, fooPartition2.getLeaderEpoch) |
| assertEquals(0, fooPartition2.getPartitionEpoch) |
| |
| if (enableRemoteStorage) { |
| val followers: util.Set[Partition] = new util.HashSet[Partition]() |
| followers.add(fooPartition0) |
| followers.add(fooPartition2) |
| verifyRLMOnLeadershipChange(Collections.singleton(fooPartition1), followers) |
| reset(mockRemoteLogManager) |
| } |
| |
| reset(mockReplicaFetcherManager) |
| |
| // The broker transitions to SHUTTING_DOWN state. This should not have |
| // any impact in KRaft mode. |
| isShuttingDown.set(true) |
| |
| // The replica begins the controlled shutdown. |
| replicaManager.beginControlledShutdown() |
| |
| // When the controller receives the controlled shutdown |
| // request, it does the following: |
| // - Shrinks the ISR of foo0 to remove this replica. |
| // - Sets the leader of foo1 to NO_LEADER because it cannot elect another leader. |
| // - Does nothing for foo2 because this replica is not in the ISR. |
| topicsDelta = new TopicsDelta(metadataImage.topics()) |
| topicsDelta.replay(new PartitionChangeRecord() |
| .setPartitionId(0) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1)) |
| .setIsr(util.Arrays.asList(localId + 1)) |
| .setLeader(localId + 1) |
| ) |
| topicsDelta.replay(new PartitionChangeRecord() |
| .setPartitionId(1) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(localId, localId + 1)) |
| .setIsr(util.Arrays.asList(localId)) |
| .setLeader(NO_LEADER) |
| ) |
| metadataImage = imageFromTopics(topicsDelta.apply()) |
| replicaManager.applyDelta(topicsDelta, metadataImage) |
| |
| // Partition foo0 and foo1 are updated. |
| assertFalse(fooPartition0.isLeader) |
| assertEquals(1, fooPartition0.getLeaderEpoch) |
| assertEquals(1, fooPartition0.getPartitionEpoch) |
| assertFalse(fooPartition1.isLeader) |
| assertEquals(1, fooPartition1.getLeaderEpoch) |
| assertEquals(1, fooPartition1.getPartitionEpoch) |
| |
| // Partition foo2 is not. |
| assertFalse(fooPartition2.isLeader) |
| assertEquals(0, fooPartition2.getLeaderEpoch) |
| assertEquals(0, fooPartition2.getPartitionEpoch) |
| |
| if (enableRemoteStorage) { |
| val followers: util.Set[Partition] = new util.HashSet[Partition]() |
| followers.add(fooPartition0) |
| followers.add(fooPartition1) |
| verifyRLMOnLeadershipChange(Collections.emptySet(), followers) |
| reset(mockRemoteLogManager) |
| } |
| |
| // Fetcher for foo0 and foo1 are stopped. |
| verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(foo0, foo1)) |
| } finally { |
| // Fetcher for foo2 is stopped when the replica manager shuts down |
| // because this replica was not in the ISR. |
| replicaManager.shutdown() |
| } |
| |
| TestUtils.assertNoNonDaemonThreads(this.getClass.getName) |
| } |
| |
| @Test |
| def testPartitionListener(): Unit = { |
| val maxFetchBytes = 1024 * 1024 |
| val aliveBrokersIds = Seq(0, 1) |
| val leaderEpoch = 5 |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), |
| brokerId = 0, aliveBrokersIds) |
| try { |
| val tp = new TopicPartition(topic, 0) |
| val tidp = new TopicIdPartition(topicId, tp) |
| val replicas = aliveBrokersIds.toList.map(Int.box).asJava |
| |
| val listener = new MockPartitionListener |
| listener.verify() |
| |
| // Registering a listener should fail because the partition does not exist yet. |
| assertFalse(replicaManager.maybeAddListener(tp, listener)) |
| |
| // Broker 0 becomes leader of the partition |
| val leaderAndIsrPartitionState = new LeaderAndIsrPartitionState() |
| .setTopicName(topic) |
| .setPartitionIndex(0) |
| .setControllerEpoch(0) |
| .setLeader(0) |
| .setLeaderEpoch(leaderEpoch) |
| .setIsr(replicas) |
| .setPartitionEpoch(0) |
| .setReplicas(replicas) |
| .setIsNew(true) |
| val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, |
| Seq(leaderAndIsrPartitionState).asJava, |
| Collections.singletonMap(topic, topicId), |
| Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() |
| val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) |
| assertEquals(Errors.NONE, leaderAndIsrResponse.error) |
| |
| // Registering it should succeed now. |
| assertTrue(replicaManager.maybeAddListener(tp, listener)) |
| listener.verify() |
| |
| // Leader appends some data |
| for (i <- 1 to 5) { |
| appendRecords(replicaManager, tp, TestUtils.singletonRecords(s"message $i".getBytes)).onFire { response => |
| assertEquals(Errors.NONE, response.error) |
| } |
| } |
| |
| // Follower fetches up to offset 2. |
| fetchPartitionAsFollower( |
| replicaManager, |
| tidp, |
| new FetchRequest.PartitionData( |
| Uuid.ZERO_UUID, |
| 2L, |
| 0L, |
| maxFetchBytes, |
| Optional.of(leaderEpoch) |
| ), |
| replicaId = 1 |
| ) |
| |
| // Listener is updated. |
| listener.verify(expectedHighWatermark = 2L) |
| |
| // Listener is removed. |
| replicaManager.removeListener(tp, listener) |
| |
| // Follower fetches up to offset 4. |
| fetchPartitionAsFollower( |
| replicaManager, |
| tidp, |
| new FetchRequest.PartitionData( |
| Uuid.ZERO_UUID, |
| 4L, |
| 0L, |
| maxFetchBytes, |
| Optional.of(leaderEpoch) |
| ), |
| replicaId = 1 |
| ) |
| |
| // Listener is not updated anymore. |
| listener.verify() |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): TopicsDelta = { |
| val leader = if (isStartIdLeader) startId else startId + 1 |
| val delta = new TopicsDelta(TopicsImage.EMPTY) |
| delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) |
| delta.replay( |
| new PartitionRecord() |
| .setPartitionId(0) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(startId, startId + 1)) |
| .setIsr(util.Arrays.asList(startId, startId + 1)) |
| .setRemovingReplicas(Collections.emptyList()) |
| .setAddingReplicas(Collections.emptyList()) |
| .setLeader(leader) |
| .setLeaderEpoch(0) |
| .setPartitionEpoch(0) |
| ) |
| |
| delta |
| } |
| |
| private def topicsChangeDelta(topicsImage: TopicsImage, startId: Int, isStartIdLeader: Boolean): TopicsDelta = { |
| val leader = if (isStartIdLeader) startId else startId + 1 |
| val delta = new TopicsDelta(topicsImage) |
| delta.replay( |
| new PartitionChangeRecord() |
| .setPartitionId(0) |
| .setTopicId(FOO_UUID) |
| .setReplicas(util.Arrays.asList(startId, startId + 1)) |
| .setIsr(util.Arrays.asList(startId, startId + 1)) |
| .setLeader(leader) |
| ) |
| delta |
| } |
| |
| private def topicsDeleteDelta(topicsImage: TopicsImage): TopicsDelta = { |
| val delta = new TopicsDelta(topicsImage) |
| delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID)) |
| |
| delta |
| } |
| |
| private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = { |
| new MetadataImage( |
| new MetadataProvenance(100L, 10, 1000L), |
| FeaturesImage.EMPTY, |
| ClusterImageTest.IMAGE1, |
| topicsImage, |
| ConfigurationsImage.EMPTY, |
| ClientQuotasImage.EMPTY, |
| ProducerIdsImage.EMPTY, |
| AclsImage.EMPTY, |
| ScramImage.EMPTY |
| ) |
| } |
| |
| def assertFetcherHasTopicId[T <: AbstractFetcherThread](manager: AbstractFetcherManager[T], |
| tp: TopicPartition, |
| expectedTopicId: Option[Uuid]): Unit = { |
| val fetchState = manager.getFetcher(tp).flatMap(_.fetchState(tp)) |
| assertTrue(fetchState.isDefined) |
| assertEquals(expectedTopicId, fetchState.get.topicId) |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testPartitionFetchStateUpdatesWithTopicIdChanges(startsWithTopicId: Boolean): Unit = { |
| val aliveBrokersIds = Seq(0, 1) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), |
| brokerId = 0, aliveBrokersIds) |
| try { |
| val tp = new TopicPartition(topic, 0) |
| val leaderAndIsr = LeaderAndIsr(1, aliveBrokersIds.toList) |
| |
| // This test either starts with a topic ID in the PartitionFetchState and removes it on the next request (startsWithTopicId) |
| // or does not start with a topic ID in the PartitionFetchState and adds one on the next request (!startsWithTopicId) |
| val startingId = if (startsWithTopicId) topicId else Uuid.ZERO_UUID |
| val startingIdOpt = if (startsWithTopicId) Some(topicId) else None |
| val leaderAndIsrRequest1 = makeLeaderAndIsrRequest(startingId, tp, aliveBrokersIds, leaderAndIsr) |
| val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) |
| assertEquals(Errors.NONE, leaderAndIsrResponse1.error) |
| |
| assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, startingIdOpt) |
| |
| val endingId = if (!startsWithTopicId) topicId else Uuid.ZERO_UUID |
| val endingIdOpt = if (!startsWithTopicId) Some(topicId) else None |
| val leaderAndIsrRequest2 = makeLeaderAndIsrRequest(endingId, tp, aliveBrokersIds, leaderAndIsr) |
| val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest2, (_, _) => ()) |
| assertEquals(Errors.NONE, leaderAndIsrResponse2.error) |
| |
| assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, endingIdOpt) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = Array(true, false)) |
| def testReplicaAlterLogDirsWithAndWithoutIds(usesTopicIds: Boolean): Unit = { |
| val tp = new TopicPartition(topic, 0) |
| val version = if (usesTopicIds) LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION else 4.toShort |
| val topicId = if (usesTopicIds) this.topicId else Uuid.ZERO_UUID |
| val topicIdOpt = if (usesTopicIds) Some(topicId) else None |
| |
| val mockReplicaAlterLogDirsManager = mock(classOf[ReplicaAlterLogDirsManager]) |
| val replicaManager = setupReplicaManagerWithMockedPurgatories( |
| timer = new MockTimer(time), |
| mockReplicaAlterLogDirsManager = Some(mockReplicaAlterLogDirsManager) |
| ) |
| |
| try { |
| replicaManager.createPartition(tp).createLogIfNotExists( |
| isNew = false, |
| isFutureReplica = false, |
| offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), |
| topicId = None |
| ) |
| |
| val leaderAndIsrRequest = makeLeaderAndIsrRequest( |
| topicId = topicId, |
| topicPartition = tp, |
| replicas = Seq(0, 1), |
| leaderAndIsr = LeaderAndIsr(0, List(0, 1)), |
| version = version |
| ) |
| replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) |
| |
| // Move the replica to the second log directory. |
| val partition = replicaManager.getPartitionOrException(tp) |
| val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).head |
| replicaManager.alterReplicaLogDirs(Map(tp -> newReplicaFolder.getAbsolutePath)) |
| |
| // Make sure the future log is created with the correct topic ID. |
| val futureLog = replicaManager.futureLocalLogOrException(tp) |
| assertEquals(topicIdOpt, futureLog.topicId) |
| |
| // Verify that addFetcherForPartitions was called with the correct topic ID. |
| verify(mockReplicaAlterLogDirsManager, times(1)) |
| .addFetcherForPartitions(Map(tp -> InitialFetchState( |
| topicId = topicIdOpt, |
| leader = BrokerEndPoint(0, "localhost", -1), |
| currentLeaderEpoch = 0, |
| initOffset = 0 |
| ))) |
| } finally { |
| replicaManager.shutdown(checkpointHW = false) |
| } |
| } |
| |
| @Test |
| def testDescribeLogDirs(): Unit = { |
| val topicPartition = 0 |
| val topicId = Uuid.randomUuid() |
| val followerBrokerId = 0 |
| val leaderBrokerId = 1 |
| val leaderEpoch = 1 |
| val leaderEpochIncrement = 2 |
| val countDownLatch = new CountDownLatch(1) |
| val offsetFromLeader = 5 |
| |
| // Prepare the mocked components for the test |
| val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time), |
| topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, |
| expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, topicId = Some(topicId)) |
| |
| val responses = replicaManager.describeLogDirs(Set(new TopicPartition(topic, topicPartition))) |
| assertEquals(mockLogMgr.liveLogDirs.size, responses.size) |
| responses.foreach { response => |
| assertEquals(Errors.NONE.code, response.errorCode) |
| assertTrue(response.totalBytes > 0) |
| assertTrue(response.usableBytes >= 0) |
| } |
| } |
| } |
| |
| class MockReplicaSelector extends ReplicaSelector { |
| |
| private val selectionCount = new AtomicLong() |
| private var partitionViewArgument: Option[PartitionView] = None |
| |
| def getSelectionCount: Long = selectionCount.get |
| def getPartitionViewArgument: Option[PartitionView] = partitionViewArgument |
| |
| override def select(topicPartition: TopicPartition, clientMetadata: ClientMetadata, partitionView: PartitionView): Optional[ReplicaView] = { |
| selectionCount.incrementAndGet() |
| partitionViewArgument = Some(partitionView) |
| Optional.of(partitionView.leader) |
| } |
| } |