MINOR: Fix error in testRestartBrokerNoErrorIfMissingPartitionMetadata (#16216)


Reviewers: Igor Soarez <soarez@apple.com>
diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
index 709454b..535db61 100644
--- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
+++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
@@ -18,11 +18,11 @@
 
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTests;
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 import kafka.test.junit.RaftClusterInvocationContext;
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -40,6 +40,7 @@
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
+import scala.jdk.javaapi.CollectionConverters;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -49,6 +50,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
@@ -66,15 +68,20 @@
         this.cluster = cluster;
     }
 
-    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT, brokers = 3),
+            @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3)
+    })
     public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException {
         RaftClusterInvocationContext.RaftClusterInstance raftInstance =
                 (RaftClusterInvocationContext.RaftClusterInstance) cluster;
 
         try (Admin admin = cluster.createAdminClient()) {
-            admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 3))).all().get();
+            kafka.utils.TestUtils.createTopicWithAdmin(admin, "foo",
+                    CollectionConverters.asScala(raftInstance.brokers().iterator()).toSeq(),
+                    CollectionConverters.asScala(raftInstance.controllers().iterator()).toSeq(),
+                    1, 3, CollectionConverters.asScala(Collections.emptyMap()), new Properties());
         }
-        cluster.waitForTopic("foo", 1);
 
         Optional<PartitionMetadataFile> partitionMetadataFile = Optional.ofNullable(
                 raftInstance.getUnderlying().brokers().get(0).logManager()