KAFKA-15719: Add KRaft support in OffsetsForLeaderEpochRequestTest (#15049)


Reviewers: Mickael Maison <mickael.maison@gmail.com>
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 12d2fb3..00ca644 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -17,8 +17,7 @@
 package kafka.server
 
 import java.util.Optional
-
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
 import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
@@ -27,14 +26,16 @@
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.{OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
 
 class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
 
-  @Test
-  def testOffsetsForLeaderEpochErrorCodes(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testOffsetsForLeaderEpochErrorCodes(quorum: String): Unit = {
     val topic = "topic"
     val partition = new TopicPartition(topic, 0)
     val epochs = offsetForLeaderTopicCollectionFor(partition, 0, RecordBatch.NO_PARTITION_LEADER_EPOCH)
@@ -43,24 +44,26 @@
       ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build()
 
     // Unknown topic
-    val randomBrokerId = servers.head.config.brokerId
+    val randomBrokerId = brokers.head.config.brokerId
     assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, request)
 
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers)
-    val replicas = zkClient.getReplicasForPartition(partition).toSet
+    val partitionToLeader = createTopic(topic, numPartitions = 1, replicationFactor = 2)
+    val topicDescription = createAdminClient().describeTopics(Seq(partition.topic()).asJava).allTopicNames().get()
+    val replicas = topicDescription.get(partition.topic()).partitions().get(partition.partition()).replicas().asScala.map(_.id()).toSet
     val leader = partitionToLeader(partition.partition)
     val follower = replicas.find(_ != leader).get
-    val nonReplica = servers.map(_.config.brokerId).find(!replicas.contains(_)).get
+    val nonReplica = brokers.map(_.config.brokerId).find(!replicas.contains(_)).get
 
     assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, request)
     assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, request)
   }
 
-  @Test
-  def testCurrentEpochValidation(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCurrentEpochValidation(quorum: String): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+    val partitionToLeader = createTopic(topic, numPartitions = 1, replicationFactor = 3)
     val firstLeaderId = partitionToLeader(topicPartition.partition)
 
     def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
@@ -76,15 +79,15 @@
     killBroker(firstLeaderId)
 
     // Check leader error codes
-    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, firstLeaderId)
-    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, servers)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, topicPartition, brokers)
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty())
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.of(secondLeaderEpoch))
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, secondLeaderId, Optional.of(secondLeaderEpoch - 1))
     assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, secondLeaderId, Optional.of(secondLeaderEpoch + 1))
 
     // Check follower error codes
-    val followerId = TestUtils.findFollowerId(topicPartition, servers)
+    val followerId = TestUtils.findFollowerId(topicPartition, brokers)
     assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.empty())
     assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.of(secondLeaderEpoch))
     assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))