MINOR: Fix error in testRestartBrokerNoErrorIfMissingPartitionMetadata (#16216)
Reviewers: Igor Soarez <soarez@apple.com>
diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
index 709454b..535db61 100644
--- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
+++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
@@ -18,11 +18,11 @@
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.RaftClusterInvocationContext;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -40,6 +40,7 @@
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
+import scala.jdk.javaapi.CollectionConverters;
import java.io.IOException;
import java.time.Duration;
@@ -49,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@@ -66,15 +68,20 @@
this.cluster = cluster;
}
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3)
+ @ClusterTests({
+ @ClusterTest(clusterType = Type.KRAFT, brokers = 3),
+ @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3)
+ })
public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException {
RaftClusterInvocationContext.RaftClusterInstance raftInstance =
(RaftClusterInvocationContext.RaftClusterInstance) cluster;
try (Admin admin = cluster.createAdminClient()) {
- admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 3))).all().get();
+ kafka.utils.TestUtils.createTopicWithAdmin(admin, "foo",
+ CollectionConverters.asScala(raftInstance.brokers().iterator()).toSeq(),
+ CollectionConverters.asScala(raftInstance.controllers().iterator()).toSeq(),
+ 1, 3, CollectionConverters.asScala(Collections.emptyMap()), new Properties());
}
- cluster.waitForTopic("foo", 1);
Optional<PartitionMetadataFile> partitionMetadataFile = Optional.ofNullable(
raftInstance.getUnderlying().brokers().get(0).logManager()