KAFKA-15719: Add KRaft support in OffsetsForLeaderEpochRequestTest (#15049)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 12d2fb3..00ca644 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -17,8 +17,7 @@
package kafka.server
import java.util.Optional
-
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
@@ -27,14 +26,16 @@
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
- @Test
- def testOffsetsForLeaderEpochErrorCodes(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testOffsetsForLeaderEpochErrorCodes(quorum: String): Unit = {
val topic = "topic"
val partition = new TopicPartition(topic, 0)
val epochs = offsetForLeaderTopicCollectionFor(partition, 0, RecordBatch.NO_PARTITION_LEADER_EPOCH)
@@ -43,24 +44,26 @@
ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build()
// Unknown topic
- val randomBrokerId = servers.head.config.brokerId
+ val randomBrokerId = brokers.head.config.brokerId
assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, request)
- val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers)
- val replicas = zkClient.getReplicasForPartition(partition).toSet
+ val partitionToLeader = createTopic(topic, numPartitions = 1, replicationFactor = 2)
+ val topicDescription = createAdminClient().describeTopics(Seq(partition.topic()).asJava).allTopicNames().get()
+ val replicas = topicDescription.get(partition.topic()).partitions().get(partition.partition()).replicas().asScala.map(_.id()).toSet
val leader = partitionToLeader(partition.partition)
val follower = replicas.find(_ != leader).get
- val nonReplica = servers.map(_.config.brokerId).find(!replicas.contains(_)).get
+ val nonReplica = brokers.map(_.config.brokerId).find(!replicas.contains(_)).get
assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, request)
assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, request)
}
- @Test
- def testCurrentEpochValidation(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCurrentEpochValidation(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
- val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+ val partitionToLeader = createTopic(topic, numPartitions = 1, replicationFactor = 3)
val firstLeaderId = partitionToLeader(topicPartition.partition)
def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
@@ -76,15 +79,15 @@
killBroker(firstLeaderId)
// Check leader error codes
- val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, firstLeaderId)
- val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, servers)
+ val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, firstLeaderId)
+ val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, brokers)
assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty())
assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, secondLeaderId, Optional.of(secondLeaderEpoch - 1))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, secondLeaderId, Optional.of(secondLeaderEpoch + 1))
// Check follower error codes
- val followerId = TestUtils.findFollowerId(topicPartition, servers)
+ val followerId = TestUtils.findFollowerId(topicPartition, brokers)
assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.empty())
assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))