KAFKA-14248; Fix flaky test PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs (#12669)

The test is failing intermittently because we do not wait for propagation of the altered config (LogRetentionTimeMillisProp) across all brokers before proceeding ahead with the test.

This PR makes the following changes:
1. Wait for propagation of altered configuration to propagate to all brokers.
2. Use the existing `killBroker` utility method which waits for shutdown using `awaitshutdown`.
3. Improve code readability by using `TestUtils.incrementalAlterConfigs` to send alter config requests.

Reviewers: Jason Gustafson <jason@confluent.io>
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 7121f98..1656af0 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -33,7 +33,6 @@
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.HostResolver
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@@ -159,22 +158,6 @@
     waitForTopics(client, List(), topics)
   }
 
-  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk")) // KRaft mode will be supported in KAFKA-13910
-  def testMetadataRefresh(quorum: String): Unit = {
-    client = Admin.create(createConfig)
-    val topics = Seq("mytopic")
-    val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort))
-    client.createTopics(newTopics.asJava).all.get()
-    waitForTopics(client, expectedPresent = topics, expectedMissing = List())
-
-    val controller = brokers.find(_.config.brokerId == brokers.flatMap(_.metadataCache.getControllerId).head).get
-    controller.shutdown()
-    controller.awaitShutdown()
-    val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
-    assertEquals(topics.toSet, topicDesc.keySet.asScala)
-  }
-
   /**
     * describe should not auto create topics
     */
@@ -821,10 +804,10 @@
   @ValueSource(strings = Array("zk", "kraft"))
   def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
     val leaders = createTopic(topic, replicationFactor = brokerCount)
-    val followerIndex = if (leaders(0) != brokers(0).config.brokerId) 0 else 1
+    val followerIndex = if (leaders(0) != brokers.head.config.brokerId) 0 else 1
 
     def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
-      TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition) != None,
+      TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition).isDefined,
                               "Expected follower to create replica for partition")
 
       // wait until the follower discovers that log start offset moved beyond its HW
@@ -862,6 +845,7 @@
     val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava)
     result1.all().get()
     restartDeadBrokers()
+    TestUtils.waitForBrokersInIsr(client, topicPartition, Set(followerIndex))
     waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
   }
 
@@ -1522,7 +1506,7 @@
     // Now change the preferred leader to 1
     changePreferredLeader(prefer1)
     // but shut it down...
-    brokers(1).shutdown()
+    killBroker(1)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
 
     def assertPreferredLeaderNotAvailable(
@@ -1576,9 +1560,9 @@
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     brokers(broker2).startup()
 
@@ -1610,9 +1594,9 @@
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertNoLeader(client, partition2)
     brokers(broker2).startup()
@@ -1648,9 +1632,9 @@
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
     brokers(broker2).startup()
@@ -1708,9 +1692,9 @@
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
 
     val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
@@ -1737,7 +1721,7 @@
 
     TestUtils.assertLeader(client, partition1, broker1)
 
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertLeader(client, partition1, broker2)
     brokers(broker1).startup()
 
@@ -1769,9 +1753,9 @@
     TestUtils.assertLeader(client, partition1, broker1)
     TestUtils.assertLeader(client, partition2, broker1)
 
-    brokers(broker2).shutdown()
+    killBroker(broker2)
     TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
-    brokers(broker1).shutdown()
+    killBroker(broker1)
     TestUtils.assertNoLeader(client, partition1)
     TestUtils.assertLeader(client, partition2, broker3)
     brokers(broker2).startup()
@@ -2505,7 +2489,7 @@
     val alterResult = client.incrementalAlterConfigs(Map(
       topicResource -> topicAlterConfigs
     ).asJava)
-    alterResult.all().get()
+    alterResult.all().get(15, TimeUnit.SECONDS)
 
     ensureConsistentKRaftMetadata()
     val config = client.describeConfigs(List(topicResource).asJava).all().get().get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp)
@@ -2523,19 +2507,29 @@
   def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
     client = Admin.create(super.createConfig)
 
-    val alterMap = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]
-    alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, ""), util.Arrays.asList(
-      new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "10800000"), OpType.SET)))
-    (brokers.map(_.config) ++ controllerServers.map(_.config)).foreach { case config =>
-      alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, config.nodeId.toString()),
-        util.Arrays.asList(new AlterConfigOp(new ConfigEntry(
-          KafkaConfig.LogCleanerDeleteRetentionMsProp, "34"), OpType.SET)))
+    val newLogRetentionProperties = new Properties
+    newLogRetentionProperties.put(KafkaConfig.LogRetentionTimeMillisProp, "10800000")
+    TestUtils.incrementalAlterConfigs(null, client, newLogRetentionProperties, perBrokerConfig = false)
+      .all().get(15, TimeUnit.SECONDS)
+
+    val newLogCleanerDeleteRetention = new Properties
+    newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, "34")
+    TestUtils.incrementalAlterConfigs(brokers, client, newLogCleanerDeleteRetention, perBrokerConfig = true)
+      .all().get(15, TimeUnit.SECONDS)
+
+    if (isKRaftTest()) {
+      ensureConsistentKRaftMetadata()
+    } else {
+      waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
+        KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
+        s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
+        waitTimeMs = 60000L)
+
+      waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
+        KafkaConfig.LogRetentionTimeMillisProp, "").toString.equals("10800000")),
+        s"Timed out waiting for change to ${KafkaConfig.LogRetentionTimeMillisProp}",
+        waitTimeMs = 60000L)
     }
-    client.incrementalAlterConfigs(alterMap).all().get()
-    waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
-      KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
-      s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
-      waitTimeMs = 60000L)
 
     val newTopics = Seq(new NewTopic("foo", Map((0: Integer) -> Seq[Integer](1, 2).asJava,
       (1: Integer) -> Seq[Integer](2, 0).asJava).asJava).