KAFKA-16677 Replace ClusterType#ALL and ClusterType#DEFAULT by Array (#15897)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 212e60d..ab0f30f 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -20,8 +20,6 @@
import kafka.cluster.EndPoint;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.ZkClusterInvocationContext;
@@ -65,7 +63,6 @@
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
@ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults
@Tag("integration")
public class ConfigCommandIntegrationTest {
AdminZkClient adminZkClient;
@@ -77,10 +74,7 @@
this.cluster = cluster;
}
- @ClusterTests({
- @ClusterTest(clusterType = Type.ZK),
- @ClusterTest(clusterType = Type.KRAFT)
- })
+ @ClusterTest(types = {Type.ZK, Type.KRAFT})
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-name", cluster.isKRaftTest() ? "0" : "1",
@@ -91,9 +85,8 @@
assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut));
}
- @ClusterTests({
- @ClusterTest(clusterType = Type.ZK)
- })
+
+ @ClusterTest(types = {Type.ZK})
public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "users",
@@ -164,7 +157,7 @@
verifyConfig(zkClient, Collections.emptyMap(), brokerId);
}
- @ClusterTest(clusterType = Type.ZK)
+ @ClusterTest(types = {Type.ZK})
public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
index eb89b0e..b9406f5 100644
--- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
+++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
@@ -18,8 +18,6 @@
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.Exit;
import org.apache.kafka.test.NoRetryException;
@@ -44,7 +42,6 @@
@SuppressWarnings("dontUseSystemExit")
@ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL)
public class UserScramCredentialsCommandTest {
private static final String USER1 = "user1";
private static final String USER2 = "user2";
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
index 65d7786..624dfae 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -24,18 +24,21 @@
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Represents an immutable requested configuration of a Kafka cluster for integration testing.
*/
public class ClusterConfig {
- private final Type type;
+ private final Set<Type> types;
private final int brokers;
private final int controllers;
private final int disksPerBroker;
@@ -56,7 +59,7 @@
private final Map<Integer, Map<String, String>> perServerProperties;
@SuppressWarnings("checkstyle:ParameterNumber")
- private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
+ private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
@@ -66,7 +69,7 @@
if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of disks must be greater than zero.");
- this.type = Objects.requireNonNull(type);
+ this.types = Objects.requireNonNull(types);
this.brokers = brokers;
this.controllers = controllers;
this.disksPerBroker = disksPerBroker;
@@ -85,8 +88,8 @@
this.perServerProperties = Objects.requireNonNull(perServerProperties);
}
- public Type clusterType() {
- return type;
+ public Set<Type> clusterTypes() {
+ return types;
}
public int numBrokers() {
@@ -164,7 +167,7 @@
public static Builder defaultBuilder() {
return new Builder()
- .setType(Type.ZK)
+ .setTypes(Stream.of(Type.ZK, Type.KRAFT, Type.CO_KRAFT).collect(Collectors.toSet()))
.setBrokers(1)
.setControllers(1)
.setDisksPerBroker(1)
@@ -179,7 +182,7 @@
public static Builder builder(ClusterConfig clusterConfig) {
return new Builder()
- .setType(clusterConfig.type)
+ .setTypes(clusterConfig.types)
.setBrokers(clusterConfig.brokers)
.setControllers(clusterConfig.controllers)
.setDisksPerBroker(clusterConfig.disksPerBroker)
@@ -199,7 +202,7 @@
}
public static class Builder {
- private Type type;
+ private Set<Type> types;
private int brokers;
private int controllers;
private int disksPerBroker;
@@ -219,8 +222,8 @@
private Builder() {}
- public Builder setType(Type type) {
- this.type = type;
+ public Builder setTypes(Set<Type> types) {
+ this.types = Collections.unmodifiableSet(new HashSet<>(types));
return this;
}
@@ -307,7 +310,7 @@
}
public ClusterConfig build() {
- return new ClusterConfig(type, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
+ return new ClusterConfig(types, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties, perServerProperties);
}
diff --git a/core/src/test/java/kafka/test/ClusterConfigTest.java b/core/src/test/java/kafka/test/ClusterConfigTest.java
index 1ad1659..e979b2e 100644
--- a/core/src/test/java/kafka/test/ClusterConfigTest.java
+++ b/core/src/test/java/kafka/test/ClusterConfigTest.java
@@ -46,7 +46,7 @@
File trustStoreFile = TestUtils.tempFile();
ClusterConfig clusterConfig = ClusterConfig.builder()
- .setType(Type.KRAFT)
+ .setTypes(Collections.singleton(Type.KRAFT))
.setBrokers(3)
.setControllers(2)
.setDisksPerBroker(1)
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index 0259314..e7ef982 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -20,6 +20,7 @@
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.Type;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
@@ -39,18 +40,10 @@
public interface ClusterInstance {
- enum ClusterType {
- ZK,
- RAFT
- }
-
- /**
- * Cluster type. For now, only ZK is supported.
- */
- ClusterType clusterType();
+ Type type();
default boolean isKRaftTest() {
- return clusterType() == ClusterType.RAFT;
+ return type() == Type.KRAFT || type() == Type.CO_KRAFT;
}
/**
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 9d03206..96ba584 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -47,7 +47,7 @@
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
-@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
+@ClusterTestDefaults(types = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
}) // Set defaults for a few params in @ClusterTest(s)
@@ -65,6 +65,7 @@
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar");
clusterGenerator.accept(ClusterConfig.defaultBuilder()
+ .setTypes(Collections.singleton(Type.ZK))
.setName("Generated Test")
.setServerProperties(serverProperties)
.build());
@@ -74,14 +75,14 @@
@ClusterTest
public void testClusterTest(ClusterInstance clusterInstance) {
Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
- Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType()); // From the class level default
+ Assertions.assertEquals(Type.ZK, clusterInstance.type()); // From the class level default
Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
}
// generate1 is a template method which generates any number of cluster configs
@ClusterTemplate("generate1")
public void testClusterTemplate() {
- Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType(),
+ Assertions.assertEquals(Type.ZK, clusterInstance.type(),
"generate1 provided a Zk cluster, so we should see that here");
Assertions.assertEquals("Generated Test", clusterInstance.config().name().orElse(""),
"generate1 named this cluster config, so we should see that here");
@@ -90,19 +91,19 @@
// Multiple @ClusterTest can be used with @ClusterTests
@ClusterTests({
- @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
+ @ClusterTest(name = "cluster-tests-1", types = {Type.ZK}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400
}),
- @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
+ @ClusterTest(name = "cluster-tests-2", types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"),
@ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300")
}),
- @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
+ @ClusterTest(name = "cluster-tests-3", types = {Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
@@ -136,7 +137,7 @@
Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
}
// In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
- if (clusterInstance.config().clusterType() == Type.KRAFT) {
+ if (clusterInstance.type() == Type.KRAFT) {
try (Admin admin = Admin.create(Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
@@ -149,12 +150,8 @@
}
@ClusterTests({
- @ClusterTest(clusterType = Type.ZK),
- @ClusterTest(clusterType = Type.ZK, disksPerBroker = 2),
- @ClusterTest(clusterType = Type.KRAFT),
- @ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2),
- @ClusterTest(clusterType = Type.CO_KRAFT),
- @ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2)
+ @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}),
+ @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
})
public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException {
Admin admin = clusterInstance.createAdminClient();
@@ -178,21 +175,21 @@
}
@ClusterTests({
- @ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "enable-new-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
}),
- @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
- @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
- @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
- @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@@ -206,13 +203,13 @@
}
@ClusterTests({
- @ClusterTest(name = "disable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "disable-new-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
}),
- @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
- @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index c3c9364..a114c3c 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -33,7 +33,7 @@
@Retention(RUNTIME)
@TestTemplate
public @interface ClusterTest {
- Type clusterType() default Type.DEFAULT;
+ Type[] types() default {};
int brokers() default 0;
int controllers() default 0;
int disksPerBroker() default 0;
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
index af45b87..5e19c6f 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
@@ -35,7 +35,7 @@
@Target({TYPE})
@Retention(RUNTIME)
public @interface ClusterTestDefaults {
- Type clusterType() default Type.ZK;
+ Type[] types() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT};
int brokers() default 1;
int controllers() default 1;
int disksPerBroker() default 1;
diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java
index 27c2b79..3d35b30 100644
--- a/core/src/test/java/kafka/test/annotation/Type.java
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -45,20 +45,6 @@
public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
}
- },
- ALL {
- @Override
- public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
- invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false));
- invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true));
- invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
- }
- },
- DEFAULT {
- @Override
- public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
- throw new UnsupportedOperationException("Cannot create invocation contexts for DEFAULT type");
- }
};
public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer);
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index 6f5ad19..02371fe 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -34,6 +34,7 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -131,7 +132,11 @@
generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);
String baseDisplayName = context.getRequiredTestMethod().getName();
- generatedClusterConfigs.forEach(config -> config.clusterType().invocationContexts(baseDisplayName, config, testInvocations));
+ generatedClusterConfigs.forEach(config -> {
+ for (Type type: config.clusterTypes()) {
+ type.invocationContexts(baseDisplayName, config, testInvocations);
+ }
+ });
}
private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
@@ -142,7 +147,7 @@
private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults,
Consumer<TestTemplateInvocationContext> testInvocations) {
- Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType();
+ Type[] types = annot.types().length == 0 ? defaults.types() : annot.types();
Map<String, String> serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
.filter(e -> e.id() == -1)
.collect(Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b));
@@ -153,7 +158,7 @@
Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b))));
ClusterConfig config = ClusterConfig.builder()
- .setType(type)
+ .setTypes(new HashSet<>(Arrays.asList(types)))
.setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers())
.setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers())
.setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker())
@@ -165,7 +170,9 @@
.setSecurityProtocol(annot.securityProtocol())
.setMetadataVersion(annot.metadataVersion())
.build();
- type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
+ for (Type type : types) {
+ type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
+ }
}
private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {
diff --git a/core/src/test/java/kafka/test/junit/README.md b/core/src/test/java/kafka/test/junit/README.md
index 523c585..491e0de 100644
--- a/core/src/test/java/kafka/test/junit/README.md
+++ b/core/src/test/java/kafka/test/junit/README.md
@@ -10,11 +10,11 @@
def testSomething(): Unit = { ... }
```
-This annotation has fields for cluster type and number of brokers, as well as commonly parameterized configurations.
+This annotation has fields for a set of cluster types and number of brokers, as well as commonly parameterized configurations.
Arbitrary server properties can also be provided in the annotation:
```java
-@ClusterTest(clusterType = Type.Zk, securityProtocol = "PLAINTEXT", properties = {
+@ClusterTest(types = {Type.Zk}, securityProtocol = "PLAINTEXT", properties = {
@ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
@ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
})
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 57bd796..ad4d549 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -21,6 +21,7 @@
import kafka.server.BrokerFeatures;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
+import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.testkit.KafkaClusterTestKit;
@@ -87,7 +88,7 @@
@Override
public List<Extension> getAdditionalExtensions() {
- RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig);
+ RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig, isCombined);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
@@ -131,11 +132,13 @@
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
+ private final boolean isCombined;
- RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, AtomicReference<EmbeddedZookeeper> zkReference, ClusterConfig clusterConfig) {
+ RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, AtomicReference<EmbeddedZookeeper> zkReference, ClusterConfig clusterConfig, boolean isCombined) {
this.clusterReference = clusterReference;
this.zkReference = zkReference;
this.clusterConfig = clusterConfig;
+ this.isCombined = isCombined;
}
@Override
@@ -209,8 +212,8 @@
}
@Override
- public ClusterType clusterType() {
- return ClusterType.RAFT;
+ public Type type() {
+ return isCombined ? Type.CO_KRAFT : Type.KRAFT;
}
@Override
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index ab23aed..de1cf53 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -21,6 +21,7 @@
import kafka.network.SocketServer;
import kafka.server.BrokerFeatures;
import kafka.server.KafkaServer;
+import kafka.test.annotation.Type;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.utils.EmptyTestInfo;
@@ -189,8 +190,8 @@
}
@Override
- public ClusterType clusterType() {
- return ClusterType.ZK;
+ public Type type() {
+ return Type.ZK;
}
@Override
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index b66d536..0d84892 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -45,7 +45,7 @@
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0"))
clusterGenerator.accept(ClusterConfig.defaultBuilder()
- .setType(Type.ZK)
+ .setTypes(Set(Type.ZK).asJava)
.setBrokers(3)
.setAutoStart(false)
.setServerProperties(serverProperties)
@@ -61,9 +61,9 @@
class ProducerIdsIntegrationTest {
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
- new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
- new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+ new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
+ new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+ new ClusterTest(types = Array(Type.KRAFT), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
@@ -76,7 +76,7 @@
clusterInstance.stop()
}
- @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
new ClusterConfigProperty(key = "num.io.threads", value = "1")
))
@Timeout(20)
@@ -87,7 +87,7 @@
}
@Disabled // TODO: Enable once producer id block size is configurable (KAFKA-15029)
- @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
new ClusterConfigProperty(key = "num.io.threads", value = "2")
))
def testMultipleAllocateProducerIdsRequest(clusterInstance: ClusterInstance): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index 8ed6064..e0bc197 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -47,7 +47,7 @@
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class KafkaServerKRaftRegistrationTest {
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -95,7 +95,7 @@
}
}
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+ @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): Unit = {
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster.clusterId()
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 5640f43..31edc3c 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -32,12 +32,12 @@
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class MetadataVersionIntegrationTest {
@ClusterTests(value = Array(
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0),
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1),
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV2),
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV3),
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0),
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+ new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0),
+ new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV1),
+ new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV2),
+ new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV3),
+ new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_4_IV0),
+ new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_4_IV0)
))
def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
@@ -60,7 +60,7 @@
}, "Never saw metadata.version increase on broker")
}
- @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+ @ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0)
def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
@@ -69,7 +69,7 @@
updateResult.all().get()
}
- @ClusterTest(clusterType = Type.KRAFT)
+ @ClusterTest(types = Array(Type.KRAFT))
def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 6e4cd7d..464df8a 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -84,7 +84,7 @@
.setMetadataVersion(mv)
.setBrokers(3)
.setServerProperties(serverProperties)
- .setType(Type.ZK)
+ .setTypes(Set(Type.ZK).asJava)
.build())
}
}
@@ -113,7 +113,7 @@
}
@ClusterTest(
- brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
+ brokers = 3, types = Array(Type.ZK), autoStart = AutoStart.YES,
metadataVersion = MetadataVersion.IBP_3_4_IV0,
serverProperties = Array(
new ClusterConfigProperty(key="authorizer.class.name", value="kafka.security.authorizer.AclAuthorizer"),
@@ -156,7 +156,7 @@
}
@ClusterTest(
- brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
+ brokers = 3, types = Array(Type.ZK), autoStart = AutoStart.YES,
metadataVersion = MetadataVersion.IBP_3_4_IV0,
serverProperties = Array(
new ClusterConfigProperty(key = "authorizer.class.name", value = "kafka.security.authorizer.AclAuthorizer"),
@@ -219,7 +219,7 @@
* and modifies data using AdminClient. The ZkMigrationClient is then used to read the metadata from ZK
* as would happen during a migration. The generated records are then verified.
*/
- @ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+ @ClusterTest(brokers = 3, types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_4_IV0)
def testMigrate(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val newTopics = new util.ArrayList[NewTopic]()
@@ -429,7 +429,7 @@
}
// SCRAM and Quota are intermixed. Test SCRAM Only here
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -492,7 +492,7 @@
}
}
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -665,7 +665,7 @@
}
// SCRAM and Quota are intermixed. Test both here
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -730,7 +730,7 @@
}
}
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -809,7 +809,7 @@
}
}
- @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = MetadataVersion.IBP_3_7_IV0, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 4, metadataVersion = MetadataVersion.IBP_3_7_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -893,7 +893,7 @@
}
}
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
index 5cb5957..cea7835 100644
--- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -32,7 +32,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index de5720f..c02874e 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -21,7 +21,7 @@
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
@@ -30,11 +30,10 @@
// TODO: Introduce template in ClusterTests https://issues.apache.org/jira/browse/KAFKA-16595
// currently we can't apply template in ClusterTests hence we see bunch of duplicate settings in ClusterTests
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(brokers = 1)
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+ new ClusterTest(types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
// Configure control plane listener to make sure we have separate listeners for testing.
@@ -43,11 +42,7 @@
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
- new ClusterTest(clusterType = Type.CO_KRAFT, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
- new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
- new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
- )),
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+ new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
)),
@@ -59,7 +54,7 @@
}
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+ new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
// Configure control plane listener to make sure we have separate listeners for testing.
@@ -68,11 +63,7 @@
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
- new ClusterTest(clusterType = Type.CO_KRAFT, serverProperties = Array(
- new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
- new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
- )),
- new ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
)),
@@ -83,7 +74,7 @@
validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true)
}
- @ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
@@ -96,7 +87,7 @@
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get())
}
- @ClusterTest(clusterType = Type.KRAFT)
+ @ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestThroughControllerListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName.get())
@@ -104,15 +95,14 @@
}
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+ new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
- new ClusterTest(clusterType = Type.CO_KRAFT),
- new ClusterTest(clusterType = Type.KRAFT),
+ new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)),
))
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
@@ -126,7 +116,7 @@
}
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
+ new ClusterTest(types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
// Configure control plane listener to make sure we have separate listeners for testing.
@@ -135,11 +125,7 @@
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
- new ClusterTest(clusterType = Type.CO_KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
- new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
- new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
- )),
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
+ new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
)),
@@ -150,7 +136,7 @@
validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0)
}
- @ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
@@ -163,7 +149,7 @@
validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get())
}
- @ClusterTest(clusterType = Type.KRAFT)
+ @ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName.get())
@@ -171,15 +157,14 @@
}
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+ new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
// Configure control plane listener to make sure we have separate listeners for testing.
new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
)),
- new ClusterTest(clusterType = Type.CO_KRAFT),
- new ClusterTest(clusterType = Type.KRAFT),
+ new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)),
))
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index 1e3adc3..40d7a52 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.ClusterTest
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -28,7 +28,6 @@
import scala.jdk.CollectionConverters._
-@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerMetricNamesTest(cluster: ClusterInstance) {
@AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 6c6199f..73cab9c 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -134,7 +134,7 @@
Errors.forCode(resp.topics().find(topicName).errorCode())
}
- @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+ @ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
def testRegisterZkWithKRaftMigrationDisabled(clusterInstance: ClusterInstance): Unit = {
val clusterId = clusterInstance.clusterId()
@@ -162,7 +162,7 @@
}
}
- @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
+ @ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
def testRegisterZkWith33Controller(clusterInstance: ClusterInstance): Unit = {
// Verify that a controller running an old metadata.version cannot register a ZK broker
@@ -195,7 +195,7 @@
}
@ClusterTest(
- clusterType = Type.KRAFT,
+ types = Array(Type.KRAFT),
brokers = 1,
controllers = 1,
metadataVersion = MetadataVersion.IBP_3_4_IV0,
@@ -235,7 +235,7 @@
* through the RPCs. The migration never proceeds past pre-migration since no ZK brokers are registered.
*/
@ClusterTests(Array(
- new ClusterTest(clusterType = Type.KRAFT, autoStart = AutoStart.NO, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+ new ClusterTest(types = Array(Type.KRAFT), autoStart = AutoStart.NO, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
))
def testNoMetadataChangesInPreMigrationMode(clusterInstance: ClusterInstance): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index ed80f80..8954c89 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -21,7 +21,7 @@
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
@@ -36,7 +36,6 @@
import scala.jdk.CollectionConverters._
-@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Tag("integration")
class ClientQuotasRequestTest(cluster: ClusterInstance) {
@@ -168,7 +167,7 @@
))
}
- @ClusterTest(clusterType = Type.ZK) // No SCRAM for Raft yet
+ @ClusterTest(types = Array(Type.ZK)) // No SCRAM for Raft yet
def testClientQuotasForScramUsers(): Unit = {
val userName = "user"
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 250cc83..af67f77 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -17,7 +17,7 @@
package kafka.server
import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
import kafka.utils.TestUtils
@@ -34,7 +34,6 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
@Tag("integration")
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@@ -49,7 +48,7 @@
assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
}
- @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
@@ -137,7 +136,7 @@
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
}
- @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
@@ -251,7 +250,7 @@
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
}
- @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 03eddbe..969e3e9 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -31,7 +31,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 92cea4c..10b94c8 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -28,7 +28,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@@ -51,7 +51,7 @@
testDeleteGroups(false)
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 45f967f..a18f5e6 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -30,7 +30,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@@ -42,7 +42,7 @@
testDescribeGroups()
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index 2a94655..73b8887 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -33,11 +33,11 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class DescribeQuorumRequestTest(cluster: ClusterInstance) {
- @ClusterTest(clusterType = Type.ZK)
+ @ClusterTest(types = Array(Type.ZK))
def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
val apiRequest = new ApiVersionsRequest.Builder().build()
val apiResponse = connectAndReceive[ApiVersionsResponse](apiRequest)
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 80a308e..02f6e2d 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -34,7 +34,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@@ -46,7 +46,7 @@
testHeartbeat()
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index 21c0de1..f626fa9 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -17,7 +17,7 @@
package kafka.server
import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
@@ -38,10 +38,9 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
@Tag("integration")
class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
- @ClusterTest(serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
@@ -50,7 +49,7 @@
testJoinGroup()
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 3eaff93..47e67e5 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -28,7 +28,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@@ -40,7 +40,7 @@
testLeaveGroup()
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index c03dffc..b992394 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -30,7 +30,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@@ -53,7 +53,7 @@
testListGroups(false)
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 346b170..7791c6c 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -26,7 +26,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@@ -50,7 +50,7 @@
testOffsetCommit(false)
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 8b4561d..8fabac4 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -26,7 +26,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@@ -49,7 +49,7 @@
testOffsetDelete(false)
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 79ec402..5f51d6c 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -34,7 +34,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@@ -60,7 +60,7 @@
testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
@@ -93,7 +93,7 @@
testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = false)
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
@@ -126,7 +126,7 @@
testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = false)
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4b7e129..adb528e 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -61,7 +61,7 @@
clusterGenerator.accept(ClusterConfig.defaultBuilder
.setSecurityProtocol(securityProtocol)
- .setType(Type.ZK)
+ .setTypes(Set(Type.ZK).asJava)
.setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties)
.setServerProperties(serverProperties)
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index ac0d068..b958f03 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -35,7 +35,7 @@
@Timeout(120)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
@Tag("integration")
class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(serverProperties = Array(
@@ -47,7 +47,7 @@
testSyncGroup()
}
- @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+ @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
index e04b904..8fae617 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
@@ -19,8 +19,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -49,7 +47,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL)
@Tag("integration")
public class DeleteRecordsCommandTest {
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index fef3803..a897827 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -45,7 +45,7 @@
@ExtendWith(value = ClusterTestExtensions.class)
@Tag("integration")
public class FeatureCommandTest {
- @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+ @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDescribeWithZK(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
@@ -53,7 +53,7 @@
assertEquals("", commandOutput);
}
- @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDescribeWithKRaft(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
@@ -63,7 +63,7 @@
"SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
}
- @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4)
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV4)
public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe"))
@@ -73,7 +73,7 @@
"SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(commandOutput));
}
- @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+ @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testUpgradeMetadataVersionWithZk(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -83,7 +83,7 @@
"update because the provided feature is not supported.", commandOutput);
}
- @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testUpgradeMetadataVersionWithKraft(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -98,7 +98,7 @@
assertEquals("metadata.version was upgraded to 6.", commandOutput);
}
- @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+ @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDowngradeMetadataVersionWithZk(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -121,7 +121,7 @@
"update because the provided feature is not supported.", commandOutput);
}
- @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+ @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 231222e..42d6581 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -21,7 +21,6 @@
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
@@ -53,7 +52,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterTestDefaults(serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4")
diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index e2267e5..b8842da 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -20,7 +20,6 @@
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
@@ -63,7 +62,7 @@
@SuppressWarnings("deprecation")
@ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {
+@ClusterTestDefaults(brokers = 3, serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "auto.leader.rebalance.enable", value = "false"),
@ClusterConfigProperty(key = "controlled.shutdown.enable", value = "true"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index aa858f4..3565036 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -18,7 +18,6 @@
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
@@ -44,7 +43,6 @@
import static java.util.Arrays.stream;
@ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.KRAFT)
@Tag("integration")
class MetadataQuorumCommandTest {
@@ -59,12 +57,9 @@
* 3. Fewer brokers than controllers
*/
@ClusterTests({
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 2),
- @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 2),
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 1),
- @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1),
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 2),
- @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 2)
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 2),
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 1),
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, controllers = 2),
})
public void testDescribeQuorumReplicationSuccessful() throws InterruptedException {
cluster.waitForReadyBrokers();
@@ -73,7 +68,7 @@
);
List<String> outputs = stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList());
- if (cluster.config().clusterType() == Type.CO_KRAFT)
+ if (cluster.type() == Type.CO_KRAFT)
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.size());
else
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.size());
@@ -86,7 +81,7 @@
assertEquals(cluster.config().numControllers() - 1, outputs.stream().filter(o -> followerPattern.matcher(o).find()).count());
Pattern observerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Observer\\s*");
- if (cluster.config().clusterType() == Type.CO_KRAFT)
+ if (cluster.type() == Type.CO_KRAFT)
assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()),
outputs.stream().filter(o -> observerPattern.matcher(o).find()).count());
else
@@ -100,12 +95,9 @@
* 3. Fewer brokers than controllers
*/
@ClusterTests({
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 2),
- @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 2),
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 1),
- @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1),
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 2),
- @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 2)
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 2),
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 1),
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, controllers = 2),
})
public void testDescribeQuorumStatusSuccessful() throws InterruptedException {
cluster.waitForReadyBrokers();
@@ -124,16 +116,13 @@
assertTrue(outputs[6].matches("CurrentVoters:\\s+\\[\\d+(,\\d+)*]"));
// There are no observers if we have fewer brokers than controllers
- if (cluster.config().clusterType() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers())
+ if (cluster.type() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers())
assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]"));
else
assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[\\d+(,\\d+)*]"));
}
- @ClusterTests({
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1),
- @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1)
- })
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testOnlyOneBrokerAndOneController() {
String statusOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")
@@ -147,9 +136,7 @@
assertEquals("0", replicationOutput.split("\n")[1].split("\\s+")[2]);
}
- @ClusterTests({
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1)
- })
+ @ClusterTest(types = {Type.CO_KRAFT})
public void testCommandConfig() throws IOException {
// specifying a --command-config containing properties that would prevent login must fail
File tmpfile = TestUtils.tempFile(AdminClientConfig.SECURITY_PROTOCOL_CONFIG + "=SSL_PLAINTEXT");
@@ -157,7 +144,7 @@
"--command-config", tmpfile.getAbsolutePath(), "describe", "--status"));
}
- @ClusterTest(clusterType = Type.ZK, brokers = 1)
+ @ClusterTest(types = {Type.ZK})
public void testDescribeQuorumInZkMode() {
assertInstanceOf(UnsupportedVersionException.class, assertThrows(
ExecutionException.class,
@@ -171,7 +158,7 @@
}
- @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1)
+ @ClusterTest(types = {Type.CO_KRAFT})
public void testHumanReadableOutput() {
assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--human-readable"));
assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status", "--human-readable"));
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index 1ffad2b..36dfab5 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static java.util.Collections.singleton;
import static kafka.test.annotation.Type.CO_KRAFT;
@@ -53,40 +55,21 @@
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
- ClusterConfig zk = ClusterConfig.defaultBuilder()
- .setType(ZK)
+ ClusterConfig classicGroupCoordinator = ClusterConfig.defaultBuilder()
+ .setTypes(Stream.of(ZK, KRAFT, CO_KRAFT).collect(Collectors.toSet()))
.setServerProperties(serverProperties)
.build();
- clusterGenerator.accept(zk);
-
- ClusterConfig raftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
- .setType(KRAFT)
- .setServerProperties(serverProperties)
- .build();
- clusterGenerator.accept(raftWithLegacyCoordinator);
-
- ClusterConfig combinedKRaftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
- .setType(CO_KRAFT)
- .setServerProperties(serverProperties)
- .build();
- clusterGenerator.accept(combinedKRaftWithLegacyCoordinator);
+ clusterGenerator.accept(classicGroupCoordinator);
// Following are test case config with new group coordinator
serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
- ClusterConfig raftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
- .setType(KRAFT)
- .setName("newGroupCoordinator")
+ ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder()
+ .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet()))
+ .setName("consumerGroupCoordinator")
.setServerProperties(serverProperties)
.build();
- clusterGenerator.accept(raftWithNewGroupCoordinator);
-
- ClusterConfig combinedKRaftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
- .setType(CO_KRAFT)
- .setName("newGroupCoordinator")
- .setServerProperties(serverProperties)
- .build();
- clusterGenerator.accept(combinedKRaftWithNewGroupCoordinator);
+ clusterGenerator.accept(consumerGroupCoordinator);
}
static <T> AutoCloseable buildConsumers(int numberOfConsumers,