KAFKA-16677 Replace ClusterType#ALL and ClusterType#DEFAULT by Array (#15897)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 212e60d..ab0f30f 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -20,8 +20,6 @@
 import kafka.cluster.EndPoint;
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.ClusterTests;
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 import kafka.test.junit.ZkClusterInvocationContext;
@@ -65,7 +63,6 @@
 
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults
 @Tag("integration")
 public class ConfigCommandIntegrationTest {
     AdminZkClient adminZkClient;
@@ -77,10 +74,7 @@
         this.cluster = cluster;
     }
 
-    @ClusterTests({
-        @ClusterTest(clusterType = Type.ZK),
-        @ClusterTest(clusterType = Type.KRAFT)
-    })
+    @ClusterTest(types = {Type.ZK, Type.KRAFT})
     public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
         assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
             "--entity-name", cluster.isKRaftTest() ? "0" : "1",
@@ -91,9 +85,8 @@
                 assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut));
     }
 
-    @ClusterTests({
-        @ClusterTest(clusterType = Type.ZK)
-    })
+
+    @ClusterTest(types = {Type.ZK})
     public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
         assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
             "--entity-type", "users",
@@ -164,7 +157,7 @@
         verifyConfig(zkClient, Collections.emptyMap(), brokerId);
     }
 
-    @ClusterTest(clusterType = Type.ZK)
+    @ClusterTest(types = {Type.ZK})
     public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
         cluster.shutdownBroker(0);
         String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect();
diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
index eb89b0e..b9406f5 100644
--- a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
+++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
@@ -18,8 +18,6 @@
 
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.Exit;
 import org.apache.kafka.test.NoRetryException;
@@ -44,7 +42,6 @@
 
 @SuppressWarnings("dontUseSystemExit")
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL)
 public class UserScramCredentialsCommandTest {
     private static final String USER1 = "user1";
     private static final String USER2 = "user2";
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
index 65d7786..624dfae 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -24,18 +24,21 @@
 import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Represents an immutable requested configuration of a Kafka cluster for integration testing.
  */
 public class ClusterConfig {
 
-    private final Type type;
+    private final Set<Type> types;
     private final int brokers;
     private final int controllers;
     private final int disksPerBroker;
@@ -56,7 +59,7 @@
     private final Map<Integer, Map<String, String>> perServerProperties;
 
     @SuppressWarnings("checkstyle:ParameterNumber")
-    private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
+    private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
                   SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
                   MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
                   Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
@@ -66,7 +69,7 @@
         if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
         if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of disks must be greater than zero.");
 
-        this.type = Objects.requireNonNull(type);
+        this.types = Objects.requireNonNull(types);
         this.brokers = brokers;
         this.controllers = controllers;
         this.disksPerBroker = disksPerBroker;
@@ -85,8 +88,8 @@
         this.perServerProperties = Objects.requireNonNull(perServerProperties);
     }
 
-    public Type clusterType() {
-        return type;
+    public Set<Type> clusterTypes() {
+        return types;
     }
 
     public int numBrokers() {
@@ -164,7 +167,7 @@
 
     public static Builder defaultBuilder() {
         return new Builder()
-                .setType(Type.ZK)
+                .setTypes(Stream.of(Type.ZK, Type.KRAFT, Type.CO_KRAFT).collect(Collectors.toSet()))
                 .setBrokers(1)
                 .setControllers(1)
                 .setDisksPerBroker(1)
@@ -179,7 +182,7 @@
 
     public static Builder builder(ClusterConfig clusterConfig) {
         return new Builder()
-                .setType(clusterConfig.type)
+                .setTypes(clusterConfig.types)
                 .setBrokers(clusterConfig.brokers)
                 .setControllers(clusterConfig.controllers)
                 .setDisksPerBroker(clusterConfig.disksPerBroker)
@@ -199,7 +202,7 @@
     }
 
     public static class Builder {
-        private Type type;
+        private Set<Type> types;
         private int brokers;
         private int controllers;
         private int disksPerBroker;
@@ -219,8 +222,8 @@
 
         private Builder() {}
 
-        public Builder setType(Type type) {
-            this.type = type;
+        public Builder setTypes(Set<Type> types) {
+            this.types = Collections.unmodifiableSet(new HashSet<>(types));
             return this;
         }
 
@@ -307,7 +310,7 @@
         }
 
         public ClusterConfig build() {
-            return new ClusterConfig(type, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
+            return new ClusterConfig(types, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
                     trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
                     adminClientProperties, saslServerProperties, saslClientProperties, perServerProperties);
         }
diff --git a/core/src/test/java/kafka/test/ClusterConfigTest.java b/core/src/test/java/kafka/test/ClusterConfigTest.java
index 1ad1659..e979b2e 100644
--- a/core/src/test/java/kafka/test/ClusterConfigTest.java
+++ b/core/src/test/java/kafka/test/ClusterConfigTest.java
@@ -46,7 +46,7 @@
         File trustStoreFile = TestUtils.tempFile();
 
         ClusterConfig clusterConfig = ClusterConfig.builder()
-                .setType(Type.KRAFT)
+                .setTypes(Collections.singleton(Type.KRAFT))
                 .setBrokers(3)
                 .setControllers(2)
                 .setDisksPerBroker(1)
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index 0259314..e7ef982 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -20,6 +20,7 @@
 import kafka.network.SocketServer;
 import kafka.server.BrokerFeatures;
 import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.Type;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.common.network.ListenerName;
@@ -39,18 +40,10 @@
 
 public interface ClusterInstance {
 
-    enum ClusterType {
-        ZK,
-        RAFT
-    }
-
-    /**
-     * Cluster type. For now, only ZK is supported.
-     */
-    ClusterType clusterType();
+    Type type();
 
     default boolean isKRaftTest() {
-        return clusterType() == ClusterType.RAFT;
+        return type() == Type.KRAFT || type() == Type.CO_KRAFT;
     }
 
     /**
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 9d03206..96ba584 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -47,7 +47,7 @@
 import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
 import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
 
-@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
+@ClusterTestDefaults(types = {Type.ZK}, serverProperties = {
     @ClusterConfigProperty(key = "default.key", value = "default.value"),
     @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
 })   // Set defaults for a few params in @ClusterTest(s)
@@ -65,6 +65,7 @@
         Map<String, String> serverProperties = new HashMap<>();
         serverProperties.put("foo", "bar");
         clusterGenerator.accept(ClusterConfig.defaultBuilder()
+                .setTypes(Collections.singleton(Type.ZK))
                 .setName("Generated Test")
                 .setServerProperties(serverProperties)
                 .build());
@@ -74,14 +75,14 @@
     @ClusterTest
     public void testClusterTest(ClusterInstance clusterInstance) {
         Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
-        Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType()); // From the class level default
+        Assertions.assertEquals(Type.ZK, clusterInstance.type()); // From the class level default
         Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
     }
 
     // generate1 is a template method which generates any number of cluster configs
     @ClusterTemplate("generate1")
     public void testClusterTemplate() {
-        Assertions.assertEquals(ClusterInstance.ClusterType.ZK, clusterInstance.clusterType(),
+        Assertions.assertEquals(Type.ZK, clusterInstance.type(),
             "generate1 provided a Zk cluster, so we should see that here");
         Assertions.assertEquals("Generated Test", clusterInstance.config().name().orElse(""),
             "generate1 named this cluster config, so we should see that here");
@@ -90,19 +91,19 @@
 
     // Multiple @ClusterTest can be used with @ClusterTests
     @ClusterTests({
-        @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
+        @ClusterTest(name = "cluster-tests-1", types = {Type.ZK}, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "bar"),
             @ClusterConfigProperty(key = "spam", value = "eggs"),
             @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400
         }),
-        @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
+        @ClusterTest(name = "cluster-tests-2", types = {Type.KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
             @ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
             @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"),
             @ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300")
         }),
-        @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
+        @ClusterTest(name = "cluster-tests-3", types = {Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
             @ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
@@ -136,7 +137,7 @@
                 Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
             }
             // In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
-            if (clusterInstance.config().clusterType() == Type.KRAFT) {
+            if (clusterInstance.type() == Type.KRAFT) {
                 try (Admin admin = Admin.create(Collections.singletonMap(
                         AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
                     ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
@@ -149,12 +150,8 @@
     }
 
     @ClusterTests({
-        @ClusterTest(clusterType = Type.ZK),
-        @ClusterTest(clusterType = Type.ZK, disksPerBroker = 2),
-        @ClusterTest(clusterType = Type.KRAFT),
-        @ClusterTest(clusterType = Type.KRAFT, disksPerBroker = 2),
-        @ClusterTest(clusterType = Type.CO_KRAFT),
-        @ClusterTest(clusterType = Type.CO_KRAFT, disksPerBroker = 2)
+        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}),
+        @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
     })
     public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException {
         Admin admin = clusterInstance.createAdminClient();
@@ -178,21 +175,21 @@
     }
 
     @ClusterTests({
-        @ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "enable-new-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
         }),
-        @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
         }),
-        @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
             @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
         }),
-        @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
             @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
         }),
-        @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
             @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
         }),
@@ -206,13 +203,13 @@
     }
 
     @ClusterTests({
-        @ClusterTest(name = "disable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "disable-new-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
         }),
-        @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
         }),
-        @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+        @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
             @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
             @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
         }),
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index c3c9364..a114c3c 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -33,7 +33,7 @@
 @Retention(RUNTIME)
 @TestTemplate
 public @interface ClusterTest {
-    Type clusterType() default Type.DEFAULT;
+    Type[] types() default {};
     int brokers() default 0;
     int controllers() default 0;
     int disksPerBroker() default 0;
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
index af45b87..5e19c6f 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
@@ -35,7 +35,7 @@
 @Target({TYPE})
 @Retention(RUNTIME)
 public @interface ClusterTestDefaults {
-    Type clusterType() default Type.ZK;
+    Type[] types() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT};
     int brokers() default 1;
     int controllers() default 1;
     int disksPerBroker() default 1;
diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java
index 27c2b79..3d35b30 100644
--- a/core/src/test/java/kafka/test/annotation/Type.java
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -45,20 +45,6 @@
         public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
             invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
         }
-    },
-    ALL {
-        @Override
-        public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
-            invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, false));
-            invocationConsumer.accept(new RaftClusterInvocationContext(baseDisplayName, config, true));
-            invocationConsumer.accept(new ZkClusterInvocationContext(baseDisplayName, config));
-        }
-    },
-    DEFAULT {
-        @Override
-        public void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
-            throw new UnsupportedOperationException("Cannot create invocation contexts for DEFAULT type");
-        }
     };
 
     public abstract void invocationContexts(String baseDisplayName, ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer);
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index 6f5ad19..02371fe 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -34,6 +34,7 @@
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -131,7 +132,11 @@
         generateClusterConfigurations(context, annot.value(), generatedClusterConfigs::add);
 
         String baseDisplayName = context.getRequiredTestMethod().getName();
-        generatedClusterConfigs.forEach(config -> config.clusterType().invocationContexts(baseDisplayName, config, testInvocations));
+        generatedClusterConfigs.forEach(config -> {
+            for (Type type: config.clusterTypes()) {
+                type.invocationContexts(baseDisplayName, config, testInvocations);
+            }
+        });
     }
 
     private void generateClusterConfigurations(ExtensionContext context, String generateClustersMethods, ClusterGenerator generator) {
@@ -142,7 +147,7 @@
 
     private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults,
                                     Consumer<TestTemplateInvocationContext> testInvocations) {
-        Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType();
+        Type[] types = annot.types().length == 0 ? defaults.types() : annot.types();
         Map<String, String> serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
                 .filter(e -> e.id() == -1)
                 .collect(Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b));
@@ -153,7 +158,7 @@
                         Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b))));
 
         ClusterConfig config = ClusterConfig.builder()
-                .setType(type)
+                .setTypes(new HashSet<>(Arrays.asList(types)))
                 .setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers())
                 .setControllers(annot.controllers() == 0 ? defaults.controllers() : annot.controllers())
                 .setDisksPerBroker(annot.disksPerBroker() == 0 ? defaults.disksPerBroker() : annot.disksPerBroker())
@@ -165,7 +170,9 @@
                 .setSecurityProtocol(annot.securityProtocol())
                 .setMetadataVersion(annot.metadataVersion())
                 .build();
-        type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
+        for (Type type : types) {
+            type.invocationContexts(context.getRequiredTestMethod().getName(), config, testInvocations);
+        }
     }
 
     private ClusterTestDefaults getClusterTestDefaults(Class<?> testClass) {
diff --git a/core/src/test/java/kafka/test/junit/README.md b/core/src/test/java/kafka/test/junit/README.md
index 523c585..491e0de 100644
--- a/core/src/test/java/kafka/test/junit/README.md
+++ b/core/src/test/java/kafka/test/junit/README.md
@@ -10,11 +10,11 @@
 def testSomething(): Unit = { ... }
 ```
 
-This annotation has fields for cluster type and number of brokers, as well as commonly parameterized configurations. 
+This annotation has fields for a set of cluster types and number of brokers, as well as commonly parameterized configurations. 
 Arbitrary server properties can also be provided in the annotation:
 
 ```java
-@ClusterTest(clusterType = Type.Zk, securityProtocol = "PLAINTEXT", properties = {
+@ClusterTest(types = {Type.Zk}, securityProtocol = "PLAINTEXT", properties = {
   @ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
   @ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
 })
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 57bd796..ad4d549 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -21,6 +21,7 @@
 import kafka.server.BrokerFeatures;
 import kafka.server.BrokerServer;
 import kafka.server.ControllerServer;
+import kafka.test.annotation.Type;
 import kafka.test.ClusterConfig;
 import kafka.test.ClusterInstance;
 import kafka.testkit.KafkaClusterTestKit;
@@ -87,7 +88,7 @@
 
     @Override
     public List<Extension> getAdditionalExtensions() {
-        RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig);
+        RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, zkReference, clusterConfig, isCombined);
         return Arrays.asList(
             (BeforeTestExecutionCallback) context -> {
                 TestKitNodes nodes = new TestKitNodes.Builder().
@@ -131,11 +132,13 @@
         final AtomicBoolean started = new AtomicBoolean(false);
         final AtomicBoolean stopped = new AtomicBoolean(false);
         private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
+        private final boolean isCombined;
 
-        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, AtomicReference<EmbeddedZookeeper> zkReference, ClusterConfig clusterConfig) {
+        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, AtomicReference<EmbeddedZookeeper> zkReference, ClusterConfig clusterConfig, boolean isCombined) {
             this.clusterReference = clusterReference;
             this.zkReference = zkReference;
             this.clusterConfig = clusterConfig;
+            this.isCombined = isCombined;
         }
 
         @Override
@@ -209,8 +212,8 @@
         }
 
         @Override
-        public ClusterType clusterType() {
-            return ClusterType.RAFT;
+        public Type type() {
+            return isCombined ? Type.CO_KRAFT : Type.KRAFT;
         }
 
         @Override
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index ab23aed..de1cf53 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -21,6 +21,7 @@
 import kafka.network.SocketServer;
 import kafka.server.BrokerFeatures;
 import kafka.server.KafkaServer;
+import kafka.test.annotation.Type;
 import kafka.test.ClusterConfig;
 import kafka.test.ClusterInstance;
 import kafka.utils.EmptyTestInfo;
@@ -189,8 +190,8 @@
         }
 
         @Override
-        public ClusterType clusterType() {
-            return ClusterType.ZK;
+        public Type type() {
+            return Type.ZK;
         }
 
         @Override
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index b66d536..0d84892 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -45,7 +45,7 @@
         java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0-IV0"))
 
     clusterGenerator.accept(ClusterConfig.defaultBuilder()
-      .setType(Type.ZK)
+      .setTypes(Set(Type.ZK).asJava)
       .setBrokers(3)
       .setAutoStart(false)
       .setServerProperties(serverProperties)
@@ -61,9 +61,9 @@
 class ProducerIdsIntegrationTest {
 
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
-    new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
-    new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+    new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
+    new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+    new ClusterTest(types = Array(Type.KRAFT), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
   ))
   def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
     verifyUniqueIds(clusterInstance)
@@ -76,7 +76,7 @@
     clusterInstance.stop()
   }
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
     new ClusterConfigProperty(key = "num.io.threads", value = "1")
   ))
   @Timeout(20)
@@ -87,7 +87,7 @@
   }
 
   @Disabled // TODO: Enable once producer id block size is configurable (KAFKA-15029)
-  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO, serverProperties = Array(
     new ClusterConfigProperty(key = "num.io.threads", value = "2")
   ))
   def testMultipleAllocateProducerIdsRequest(clusterInstance: ClusterInstance): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
index 8ed6064..e0bc197 100644
--- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
@@ -47,7 +47,7 @@
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class KafkaServerKRaftRegistrationTest {
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -95,7 +95,7 @@
     }
   }
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_3_IV0)
   def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance): Unit = {
     // Bootstrap the ZK cluster ID into KRaft
     val clusterId = zkCluster.clusterId()
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 5640f43..31edc3c 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -32,12 +32,12 @@
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class MetadataVersionIntegrationTest {
   @ClusterTests(value = Array(
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV2),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV3),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+      new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0),
+      new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV1),
+      new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV2),
+      new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV3),
+      new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_4_IV0),
+      new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_4_IV0)
   ))
   def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
     val admin = clusterInstance.createAdminClient()
@@ -60,7 +60,7 @@
     }, "Never saw metadata.version increase on broker")
   }
 
-  @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0)
+  @ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0)
   def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = {
     val admin = clusterInstance.createAdminClient()
     val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
@@ -69,7 +69,7 @@
     updateResult.all().get()
   }
 
-  @ClusterTest(clusterType = Type.KRAFT)
+  @ClusterTest(types = Array(Type.KRAFT))
   def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = {
     val admin = clusterInstance.createAdminClient()
     val describeResult = admin.describeFeatures()
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 6e4cd7d..464df8a 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -84,7 +84,7 @@
         .setMetadataVersion(mv)
         .setBrokers(3)
         .setServerProperties(serverProperties)
-        .setType(Type.ZK)
+        .setTypes(Set(Type.ZK).asJava)
         .build())
     }
   }
@@ -113,7 +113,7 @@
   }
 
   @ClusterTest(
-    brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
+    brokers = 3, types = Array(Type.ZK), autoStart = AutoStart.YES,
     metadataVersion = MetadataVersion.IBP_3_4_IV0,
     serverProperties = Array(
       new ClusterConfigProperty(key="authorizer.class.name", value="kafka.security.authorizer.AclAuthorizer"),
@@ -156,7 +156,7 @@
   }
 
   @ClusterTest(
-    brokers = 3, clusterType = Type.ZK, autoStart = AutoStart.YES,
+    brokers = 3, types = Array(Type.ZK), autoStart = AutoStart.YES,
     metadataVersion = MetadataVersion.IBP_3_4_IV0,
     serverProperties = Array(
       new ClusterConfigProperty(key = "authorizer.class.name", value = "kafka.security.authorizer.AclAuthorizer"),
@@ -219,7 +219,7 @@
    * and modifies data using AdminClient. The ZkMigrationClient is then used to read the metadata from ZK
    * as would happen during a migration. The generated records are then verified.
    */
-  @ClusterTest(brokers = 3, clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_4_IV0)
+  @ClusterTest(brokers = 3, types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_4_IV0)
   def testMigrate(clusterInstance: ClusterInstance): Unit = {
     val admin = clusterInstance.createAdminClient()
     val newTopics = new util.ArrayList[NewTopic]()
@@ -429,7 +429,7 @@
   }
 
   // SCRAM and Quota are intermixed. Test SCRAM Only here
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -492,7 +492,7 @@
     }
   }
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -665,7 +665,7 @@
   }
 
   // SCRAM and Quota are intermixed. Test both here
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -730,7 +730,7 @@
     }
   }
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -809,7 +809,7 @@
     }
   }
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = MetadataVersion.IBP_3_7_IV0, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 4, metadataVersion = MetadataVersion.IBP_3_7_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
@@ -893,7 +893,7 @@
     }
   }
 
-  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
index 5cb5957..cea7835 100644
--- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -32,7 +32,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class AllocateProducerIdsRequestTest(cluster: ClusterInstance) {
 
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index de5720f..c02874e 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -21,7 +21,7 @@
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
@@ -30,11 +30,10 @@
 // TODO: Introduce template in ClusterTests https://issues.apache.org/jira/browse/KAFKA-16595
 //  currently we can't apply template in ClusterTests hence we see bunch of duplicate settings in ClusterTests
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
 
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+    new ClusterTest(types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
       new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
       // Configure control plane listener to make sure we have separate listeners for testing.
@@ -43,11 +42,7 @@
       new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
       new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
     )),
-    new ClusterTest(clusterType = Type.CO_KRAFT, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
-      new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
-      new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
-    )),
-    new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+    new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
       new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
     )),
@@ -59,7 +54,7 @@
   }
 
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+    new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"),
       new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
       // Configure control plane listener to make sure we have separate listeners for testing.
@@ -68,11 +63,7 @@
       new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
       new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
     )),
-    new ClusterTest(clusterType = Type.CO_KRAFT, serverProperties = Array(
-      new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
-      new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
-    )),
-    new ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+    new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
       new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"),
     )),
@@ -83,7 +74,7 @@
     validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true)
   }
 
-  @ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), serverProperties = Array(
     // Configure control plane listener to make sure we have separate listeners for testing.
     new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
     new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
@@ -96,7 +87,7 @@
     validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get())
   }
 
-  @ClusterTest(clusterType = Type.KRAFT)
+  @ClusterTest(types = Array(Type.KRAFT))
   def testApiVersionsRequestThroughControllerListener(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
     val apiVersionsResponse = sendApiVersionsRequest(request, cluster.controllerListenerName.get())
@@ -104,15 +95,14 @@
   }
 
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+    new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
       // Configure control plane listener to make sure we have separate listeners for testing.
       new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
       new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
       new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
       new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
     )),
-    new ClusterTest(clusterType = Type.CO_KRAFT),
-    new ClusterTest(clusterType = Type.KRAFT),
+    new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)),
   ))
   def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
@@ -126,7 +116,7 @@
   }
 
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
+    new ClusterTest(types = Array(Type.ZK), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
       new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
       // Configure control plane listener to make sure we have separate listeners for testing.
@@ -135,11 +125,7 @@
       new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
       new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
     )),
-    new ClusterTest(clusterType = Type.CO_KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
-      new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
-      new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
-    )),
-    new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
+    new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array(
       new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
       new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"),
     )),
@@ -150,7 +136,7 @@
     validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0)
   }
 
-  @ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK), serverProperties = Array(
     // Configure control plane listener to make sure we have separate listeners for testing.
     new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
     new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
@@ -163,7 +149,7 @@
     validateApiVersionsResponse(apiVersionsResponse, cluster.controlPlaneListenerName().get())
   }
 
-  @ClusterTest(clusterType = Type.KRAFT)
+  @ClusterTest(types = Array(Type.KRAFT))
   def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName.get())
@@ -171,15 +157,14 @@
   }
 
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.ZK, serverProperties = Array(
+    new ClusterTest(types = Array(Type.ZK), serverProperties = Array(
       // Configure control plane listener to make sure we have separate listeners for testing.
       new ClusterConfigProperty(key = "control.plane.listener.name", value = "CONTROL_PLANE"),
       new ClusterConfigProperty(key = "listener.security.protocol.map", value = "CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
       new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
       new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0"),
     )),
-    new ClusterTest(clusterType = Type.CO_KRAFT),
-    new ClusterTest(clusterType = Type.KRAFT),
+    new ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT)),
   ))
   def testApiVersionsRequestValidationV3(): Unit = {
     // Invalid request because Name and Version are empty by default
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index 1e3adc3..40d7a52 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.ClusterTest
 import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -28,7 +28,6 @@
 
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(clusterType = Type.ALL)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class BrokerMetricNamesTest(cluster: ClusterInstance) {
   @AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 6c6199f..73cab9c 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -134,7 +134,7 @@
     Errors.forCode(resp.topics().find(topicName).errorCode())
   }
 
-  @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+  @ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
     serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
   def testRegisterZkWithKRaftMigrationDisabled(clusterInstance: ClusterInstance): Unit = {
     val clusterId = clusterInstance.clusterId()
@@ -162,7 +162,7 @@
     }
   }
 
-  @ClusterTest(clusterType = Type.KRAFT, brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
+  @ClusterTest(types = Array(Type.KRAFT), brokers = 0, controllers = 1, metadataVersion = MetadataVersion.IBP_3_3_IV3,
     serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "false")))
   def testRegisterZkWith33Controller(clusterInstance: ClusterInstance): Unit = {
     // Verify that a controller running an old metadata.version cannot register a ZK broker
@@ -195,7 +195,7 @@
   }
 
   @ClusterTest(
-    clusterType = Type.KRAFT,
+    types = Array(Type.KRAFT),
     brokers = 1,
     controllers = 1,
     metadataVersion = MetadataVersion.IBP_3_4_IV0,
@@ -235,7 +235,7 @@
    * through the RPCs. The migration never proceeds past pre-migration since no ZK brokers are registered.
    */
   @ClusterTests(Array(
-    new ClusterTest(clusterType = Type.KRAFT, autoStart = AutoStart.NO, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
+    new ClusterTest(types = Array(Type.KRAFT), autoStart = AutoStart.NO, controllers = 1, metadataVersion = MetadataVersion.IBP_3_4_IV0,
       serverProperties = Array(new ClusterConfigProperty(key = "zookeeper.metadata.migration.enable", value = "true")))
   ))
   def testNoMetadataChangesInPreMigrationMode(clusterInstance: ClusterInstance): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index ed80f80..8954c89 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -21,7 +21,7 @@
 import java.util
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism, UserScramCredentialUpsertion}
@@ -36,7 +36,6 @@
 
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(clusterType = Type.ALL)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @Tag("integration")
 class ClientQuotasRequestTest(cluster: ClusterInstance) {
@@ -168,7 +167,7 @@
     ))
   }
 
-  @ClusterTest(clusterType = Type.ZK) // No SCRAM for Raft yet
+  @ClusterTest(types = Array(Type.ZK)) // No SCRAM for Raft yet
   def testClientQuotasForScramUsers(): Unit = {
     val userName = "user"
 
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 250cc83..af67f77 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -17,7 +17,7 @@
 package kafka.server
 
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
 import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
 import kafka.utils.TestUtils
@@ -34,7 +34,6 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
 @Tag("integration")
 class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
 
@@ -49,7 +48,7 @@
     assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
   }
 
-  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
@@ -137,7 +136,7 @@
     assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
-  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
@@ -251,7 +250,7 @@
     assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
   }
 
-  @ClusterTest(clusterType = Type.KRAFT, serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 03eddbe..969e3e9 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -31,7 +31,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
diff --git a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 92cea4c..10b94c8 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -28,7 +28,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
@@ -51,7 +51,7 @@
     testDeleteGroups(false)
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 45f967f..a18f5e6 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -30,7 +30,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
@@ -42,7 +42,7 @@
     testDescribeGroups()
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index 2a94655..73b8887 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -33,11 +33,11 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
-  @ClusterTest(clusterType = Type.ZK)
+  @ClusterTest(types = Array(Type.ZK))
   def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
     val apiRequest = new ApiVersionsRequest.Builder().build()
     val apiResponse =  connectAndReceive[ApiVersionsResponse](apiRequest)
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 80a308e..02f6e2d 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -34,7 +34,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class HeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
@@ -46,7 +46,7 @@
     testHeartbeat()
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
index 21c0de1..f626fa9 100644
--- a/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/JoinGroupRequestTest.scala
@@ -17,7 +17,7 @@
 package kafka.server
 
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
@@ -38,10 +38,9 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
 @Tag("integration")
 class JoinGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
-  @ClusterTest(serverProperties = Array(
+  @ClusterTest(types = Array(Type.KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
@@ -50,7 +49,7 @@
     testJoinGroup()
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 3eaff93..47e67e5 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -28,7 +28,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
@@ -40,7 +40,7 @@
     testLeaveGroup()
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index c03dffc..b992394 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -30,7 +30,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
@@ -53,7 +53,7 @@
     testListGroups(false)
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 346b170..7791c6c 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -26,7 +26,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
 
@@ -50,7 +50,7 @@
     testOffsetCommit(false)
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 8b4561d..8fabac4 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -26,7 +26,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
@@ -49,7 +49,7 @@
     testOffsetDelete(false)
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 79ec402..5f51d6c 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -34,7 +34,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
 
@@ -60,7 +60,7 @@
     testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
     new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
@@ -93,7 +93,7 @@
     testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable = false)
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
     new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
@@ -126,7 +126,7 @@
     testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable = false)
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"),
     new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"),
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 4b7e129..adb528e 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -61,7 +61,7 @@
 
     clusterGenerator.accept(ClusterConfig.defaultBuilder
       .setSecurityProtocol(securityProtocol)
-      .setType(Type.ZK)
+      .setTypes(Set(Type.ZK).asJava)
       .setSaslServerProperties(saslServerProperties)
       .setSaslClientProperties(saslClientProperties)
       .setServerProperties(serverProperties)
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index ac0d068..b958f03 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -35,7 +35,7 @@
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(types = Array(Type.KRAFT))
 @Tag("integration")
 class SyncGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
   @ClusterTest(serverProperties = Array(
@@ -47,7 +47,7 @@
     testSyncGroup()
   }
 
-  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+  @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT), serverProperties = Array(
     new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic"),
     new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
     new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
diff --git a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
index e04b904..8fae617 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java
@@ -19,8 +19,6 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -49,7 +47,6 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")
 public class DeleteRecordsCommandTest {
 
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index fef3803..a897827 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -45,7 +45,7 @@
 @ExtendWith(value = ClusterTestExtensions.class)
 @Tag("integration")
 public class FeatureCommandTest {
-    @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+    @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
     public void testDescribeWithZK(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
@@ -53,7 +53,7 @@
         assertEquals("", commandOutput);
     }
 
-    @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+    @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
     public void testDescribeWithKRaft(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe"))
@@ -63,7 +63,7 @@
                 "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput));
     }
 
-    @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4)
+    @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_7_IV4)
     public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe"))
@@ -73,7 +73,7 @@
                 "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(commandOutput));
     }
 
-    @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+    @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
     public void testUpgradeMetadataVersionWithZk(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -83,7 +83,7 @@
                 "update because the provided feature is not supported.", commandOutput);
     }
 
-    @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+    @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
     public void testUpgradeMetadataVersionWithKraft(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -98,7 +98,7 @@
         assertEquals("metadata.version was upgraded to 6.", commandOutput);
     }
 
-    @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+    @ClusterTest(types = {Type.ZK}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
     public void testDowngradeMetadataVersionWithZk(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -121,7 +121,7 @@
                 "update because the provided feature is not supported.", commandOutput);
     }
 
-    @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1)
+    @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV1)
     public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) {
         String commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 231222e..42d6581 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -21,7 +21,6 @@
 import kafka.test.annotation.ClusterConfigProperty;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
@@ -53,7 +52,7 @@
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterTestDefaults(serverProperties = {
     @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
     @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
     @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4")
diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index e2267e5..b8842da 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -20,7 +20,6 @@
 import kafka.test.annotation.ClusterConfigProperty;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.admin.Admin;
@@ -63,7 +62,7 @@
 
 @SuppressWarnings("deprecation")
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {
+@ClusterTestDefaults(brokers = 3, serverProperties = {
     @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
     @ClusterConfigProperty(key = "auto.leader.rebalance.enable", value = "false"),
     @ClusterConfigProperty(key = "controlled.shutdown.enable", value = "true"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index aa858f4..3565036 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -18,7 +18,6 @@
 
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterTest;
-import kafka.test.annotation.ClusterTestDefaults;
 import kafka.test.annotation.ClusterTests;
 import kafka.test.annotation.Type;
 import kafka.test.junit.ClusterTestExtensions;
@@ -44,7 +43,6 @@
 import static java.util.Arrays.stream;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.KRAFT)
 @Tag("integration")
 class MetadataQuorumCommandTest {
 
@@ -59,12 +57,9 @@
      * 3. Fewer brokers than controllers
      */
     @ClusterTests({
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 2),
-        @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 2),
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 1),
-        @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1),
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 2),
-        @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 2)
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 2),
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 1),
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, controllers = 2),
     })
     public void testDescribeQuorumReplicationSuccessful() throws InterruptedException {
         cluster.waitForReadyBrokers();
@@ -73,7 +68,7 @@
         );
 
         List<String> outputs = stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList());
-        if (cluster.config().clusterType() == Type.CO_KRAFT)
+        if (cluster.type() == Type.CO_KRAFT)
           assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.size());
         else
           assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.size());
@@ -86,7 +81,7 @@
         assertEquals(cluster.config().numControllers() - 1, outputs.stream().filter(o -> followerPattern.matcher(o).find()).count());
 
         Pattern observerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Observer\\s*");
-        if (cluster.config().clusterType() == Type.CO_KRAFT)
+        if (cluster.type() == Type.CO_KRAFT)
             assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()),
                 outputs.stream().filter(o -> observerPattern.matcher(o).find()).count());
         else
@@ -100,12 +95,9 @@
      * 3. Fewer brokers than controllers
      */
     @ClusterTests({
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 2),
-        @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 2),
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 1),
-        @ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1),
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 2),
-        @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 2)
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 2),
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2, controllers = 1),
+        @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1, controllers = 2),
     })
     public void testDescribeQuorumStatusSuccessful() throws InterruptedException {
         cluster.waitForReadyBrokers();
@@ -124,16 +116,13 @@
         assertTrue(outputs[6].matches("CurrentVoters:\\s+\\[\\d+(,\\d+)*]"));
 
         // There are no observers if we have fewer brokers than controllers
-        if (cluster.config().clusterType() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers())
+        if (cluster.type() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers())
             assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[]"));
         else
             assertTrue(outputs[7].matches("CurrentObservers:\\s+\\[\\d+(,\\d+)*]"));
     }
 
-    @ClusterTests({
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1),
-        @ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1)
-    })
+    @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
     public void testOnlyOneBrokerAndOneController() {
         String statusOutput = ToolsTestUtils.captureStandardOut(() ->
             MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")
@@ -147,9 +136,7 @@
         assertEquals("0", replicationOutput.split("\n")[1].split("\\s+")[2]);
     }
 
-    @ClusterTests({
-        @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1)
-    })
+    @ClusterTest(types = {Type.CO_KRAFT})
     public void testCommandConfig() throws IOException {
         // specifying a --command-config containing properties that would prevent login must fail
         File tmpfile = TestUtils.tempFile(AdminClientConfig.SECURITY_PROTOCOL_CONFIG + "=SSL_PLAINTEXT");
@@ -157,7 +144,7 @@
                         "--command-config", tmpfile.getAbsolutePath(), "describe", "--status"));
     }
 
-    @ClusterTest(clusterType = Type.ZK, brokers = 1)
+    @ClusterTest(types = {Type.ZK})
     public void testDescribeQuorumInZkMode() {
         assertInstanceOf(UnsupportedVersionException.class, assertThrows(
                 ExecutionException.class,
@@ -171,7 +158,7 @@
 
     }
 
-    @ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1)
+    @ClusterTest(types = {Type.CO_KRAFT})
     public void testHumanReadableOutput() {
         assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--human-readable"));
         assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status", "--human-readable"));
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index 1ffad2b..36dfab5 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -33,6 +33,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Collections.singleton;
 import static kafka.test.annotation.Type.CO_KRAFT;
@@ -53,40 +55,21 @@
         serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
         serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
 
-        ClusterConfig zk = ClusterConfig.defaultBuilder()
-                .setType(ZK)
+        ClusterConfig classicGroupCoordinator = ClusterConfig.defaultBuilder()
+                .setTypes(Stream.of(ZK, KRAFT, CO_KRAFT).collect(Collectors.toSet()))
                 .setServerProperties(serverProperties)
                 .build();
-        clusterGenerator.accept(zk);
-
-        ClusterConfig raftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
-                .setType(KRAFT)
-                .setServerProperties(serverProperties)
-                .build();
-        clusterGenerator.accept(raftWithLegacyCoordinator);
-
-        ClusterConfig combinedKRaftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
-                .setType(CO_KRAFT)
-                .setServerProperties(serverProperties)
-                .build();
-        clusterGenerator.accept(combinedKRaftWithLegacyCoordinator);
+        clusterGenerator.accept(classicGroupCoordinator);
 
         // Following are test case config with new group coordinator
         serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
 
-        ClusterConfig raftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
-                .setType(KRAFT)
-                .setName("newGroupCoordinator")
+        ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder()
+                .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet()))
+                .setName("consumerGroupCoordinator")
                 .setServerProperties(serverProperties)
                 .build();
-        clusterGenerator.accept(raftWithNewGroupCoordinator);
-
-        ClusterConfig combinedKRaftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
-                .setType(CO_KRAFT)
-                .setName("newGroupCoordinator")
-                .setServerProperties(serverProperties)
-                .build();
-        clusterGenerator.accept(combinedKRaftWithNewGroupCoordinator);
+        clusterGenerator.accept(consumerGroupCoordinator);
     }
 
     static <T> AutoCloseable buildConsumers(int numberOfConsumers,