KAFKA-16593 Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions (#15766)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 1b44541..8970d49 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -107,6 +107,8 @@
     <allow pkg="kafka.server"/>
     <allow pkg="kafka.zk" />
     <allow pkg="org.apache.kafka.clients.admin"/>
+    <allow pkg="org.apache.kafka.clients.consumer"/>
+    <allow pkg="org.apache.kafka.coordinator.group"/>
     <subpackage name="annotation">
       <allow pkg="kafka.test"/>
     </subpackage>
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index c04f9ec..0259314 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -21,14 +21,22 @@
 import kafka.server.BrokerFeatures;
 import kafka.test.annotation.ClusterTest;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.common.network.ListenerName;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+
 public interface ClusterInstance {
 
     enum ClusterType {
@@ -145,4 +153,18 @@
     void startBroker(int brokerId);
 
     void waitForReadyBrokers() throws InterruptedException;
+
+    default Set<GroupProtocol> supportedGroupProtocols() {
+        Map<String, String> serverProperties = config().serverProperties();
+        Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
+        supportedGroupProtocols.add(CLASSIC);
+
+        // KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
+        if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") ||
+                serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) {
+            supportedGroupProtocols.add(CONSUMER);
+        }
+
+        return Collections.unmodifiableSet(supportedGroupProtocols);
+    }
 }
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 0eb94ab..1bc56df 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -27,15 +27,24 @@
 import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+
+
 @ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
     @ClusterConfigProperty(key = "default.key", value = "default.value"),
 })   // Set defaults for a few params in @ClusterTest(s)
@@ -135,4 +144,49 @@
     public void testDefaults(ClusterInstance clusterInstance) {
         Assertions.assertEquals(MetadataVersion.IBP_3_8_IV0, clusterInstance.config().metadataVersion());
     }
+
+    @ClusterTests({
+        @ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+        }),
+        @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+        }),
+        @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+        }),
+        @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+            @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
+        }),
+        @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
+            @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+        }),
+    })
+    public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
+        Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
+        supportedGroupProtocols.add(CLASSIC);
+        supportedGroupProtocols.add(CONSUMER);
+        Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols));
+        Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size());
+    }
+
+    @ClusterTests({
+        @ClusterTest(name = "disable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
+        }),
+        @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
+        }),
+        @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+            @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
+            @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
+        }),
+    })
+    public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
+        Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
+        Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size());
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 92fa463..27151cc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -276,7 +276,7 @@
             return new ArrayList<>(result.all().get());
         }
 
-        private boolean shouldPrintMemberState(String group, Optional<String> state, Optional<Integer> numRows) {
+        private boolean shouldPrintMemberState(String group, Optional<ConsumerGroupState> state, Optional<Integer> numRows) {
             // numRows contains the number of data rows, if any, compiled from the API call in the caller method.
             // if it's undefined or 0, there is no relevant group information to display.
             if (!numRows.isPresent()) {
@@ -286,37 +286,37 @@
 
             int num = numRows.get();
 
-            String state0 = state.orElse("NONE");
+            ConsumerGroupState state0 = state.orElse(ConsumerGroupState.UNKNOWN);
             switch (state0) {
-                case "Dead":
+                case DEAD:
                     printError("Consumer group '" + group + "' does not exist.", Optional.empty());
                     break;
-                case "Empty":
+                case EMPTY:
                     System.err.println("\nConsumer group '" + group + "' has no active members.");
                     break;
-                case "PreparingRebalance":
-                case "CompletingRebalance":
-                case "Assigning":
-                case "Reconciling":
+                case PREPARING_REBALANCE:
+                case COMPLETING_REBALANCE:
+                case ASSIGNING:
+                case RECONCILING:
                     System.err.println("\nWarning: Consumer group '" + group + "' is rebalancing.");
                     break;
-                case "Stable":
+                case STABLE:
                     break;
                 default:
                     // the control should never reach here
                     throw new KafkaException("Expected a valid consumer group state, but found '" + state0 + "'.");
             }
 
-            return !state0.contains("Dead") && num > 0;
+            return !state0.equals(ConsumerGroupState.DEAD) && num > 0;
         }
 
         private Optional<Integer> size(Optional<? extends Collection<?>> colOpt) {
             return colOpt.map(Collection::size);
         }
 
-        private void printOffsets(Map<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
+        private void printOffsets(Map<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
             offsets.forEach((groupId, tuple) -> {
-                Optional<String> state = tuple.getKey();
+                Optional<ConsumerGroupState> state = tuple.getKey();
                 Optional<Collection<PartitionAssignmentState>> assignments = tuple.getValue();
 
                 if (shouldPrintMemberState(groupId, state, size(assignments))) {
@@ -357,9 +357,9 @@
             return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
         }
 
-        private void printMembers(Map<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
+        private void printMembers(Map<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
             members.forEach((groupId, tuple) -> {
-                Optional<String> state = tuple.getKey();
+                Optional<ConsumerGroupState> state = tuple.getKey();
                 Optional<Collection<MemberAssignmentState>> assignments = tuple.getValue();
                 int maxGroupLen = 15, maxConsumerIdLen = 15, maxGroupInstanceIdLen = 17, maxHostLen = 15, maxClientIdLen = 15;
                 boolean includeGroupInstanceId = false;
@@ -430,7 +430,7 @@
                     String format = "\n%" + -coordinatorColLen + "s %-25s %-20s %-15s %s";
 
                     System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
-                    System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers);
+                    System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state.toString(), state.numMembers);
                     System.out.println();
                 }
             });
@@ -446,11 +446,11 @@
             long subActions = Stream.of(membersOptPresent, offsetsOptPresent, stateOptPresent).filter(x -> x).count();
 
             if (subActions == 0 || offsetsOptPresent) {
-                TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets
+                TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets
                     = collectGroupsOffsets(groupIds);
                 printOffsets(offsets);
             } else if (membersOptPresent) {
-                TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members
+                TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> members
                     = collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt));
                 printMembers(members, opts.options.has(opts.verboseOpt));
             } else {
@@ -684,16 +684,16 @@
         /**
          * Returns the state of the specified consumer group and partition assignment states
          */
-        Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
             return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
         }
 
         /**
          * Returns states of the specified consumer groups and partition assignment states
          */
-        TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
+        TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
             Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
-            TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();
+            TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();
 
             consumerGroups.forEach((groupId, consumerGroup) -> {
                 ConsumerGroupState state = consumerGroup.state();
@@ -735,22 +735,22 @@
 
                 rowsWithConsumer.addAll(rowsWithoutConsumer);
 
-                groupOffsets.put(groupId, new SimpleImmutableEntry<>(Optional.of(state.toString()), Optional.of(rowsWithConsumer)));
+                groupOffsets.put(groupId, new SimpleImmutableEntry<>(Optional.of(state), Optional.of(rowsWithConsumer)));
             });
 
             return groupOffsets;
         }
 
-        Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
             return collectGroupsMembers(Collections.singleton(groupId), verbose).get(groupId);
         }
 
-        TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
+        TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
             Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
-            TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();
+            TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();
 
             consumerGroups.forEach((groupId, consumerGroup) -> {
-                String state = consumerGroup.state().toString();
+                ConsumerGroupState state = consumerGroup.state();
                 List<MemberAssignmentState> memberAssignmentStates = consumerGroup.members().stream().map(consumer ->
                     new MemberAssignmentState(
                         groupId,
@@ -778,7 +778,7 @@
                     groupId,
                     groupDescription.coordinator(),
                     groupDescription.partitionAssignor(),
-                    groupDescription.state().toString(),
+                    groupDescription.state(),
                     groupDescription.members().size()
             )));
             return res;
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java
index 04a3b2e..18cd3f4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java
@@ -16,16 +16,17 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.Node;
 
 class GroupState {
     final String group;
     final Node coordinator;
     final String assignmentStrategy;
-    final String state;
+    final ConsumerGroupState state;
     final int numMembers;
 
-    GroupState(String group, Node coordinator, String assignmentStrategy, String state, int numMembers) {
+    GroupState(String group, Node coordinator, String assignmentStrategy, ConsumerGroupState state, int numMembers) {
         this.group = group;
         this.coordinator = coordinator;
         this.assignmentStrategy = assignmentStrategy;
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
new file mode 100644
index 0000000..1ffad2b
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static java.util.Collections.singleton;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+
+class ConsumerGroupCommandTestUtils {
+
+    private ConsumerGroupCommandTestUtils() {
+    }
+
+    static void generator(ClusterGenerator clusterGenerator) {
+        Map<String, String> serverProperties = new HashMap<>();
+        serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
+        serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+        serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
+
+        ClusterConfig zk = ClusterConfig.defaultBuilder()
+                .setType(ZK)
+                .setServerProperties(serverProperties)
+                .build();
+        clusterGenerator.accept(zk);
+
+        ClusterConfig raftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
+                .setType(KRAFT)
+                .setServerProperties(serverProperties)
+                .build();
+        clusterGenerator.accept(raftWithLegacyCoordinator);
+
+        ClusterConfig combinedKRaftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
+                .setType(CO_KRAFT)
+                .setServerProperties(serverProperties)
+                .build();
+        clusterGenerator.accept(combinedKRaftWithLegacyCoordinator);
+
+        // Following are test case config with new group coordinator
+        serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
+
+        ClusterConfig raftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
+                .setType(KRAFT)
+                .setName("newGroupCoordinator")
+                .setServerProperties(serverProperties)
+                .build();
+        clusterGenerator.accept(raftWithNewGroupCoordinator);
+
+        ClusterConfig combinedKRaftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
+                .setType(CO_KRAFT)
+                .setName("newGroupCoordinator")
+                .setServerProperties(serverProperties)
+                .build();
+        clusterGenerator.accept(combinedKRaftWithNewGroupCoordinator);
+    }
+
+    static <T> AutoCloseable buildConsumers(int numberOfConsumers,
+                                            boolean syncCommit,
+                                            String topic,
+                                            Supplier<KafkaConsumer<T, T>> consumerSupplier) {
+        List<KafkaConsumer<T, T>> consumers = new ArrayList<>(numberOfConsumers);
+        ExecutorService executor = Executors.newFixedThreadPool(numberOfConsumers);
+        AtomicBoolean closed = new AtomicBoolean(false);
+        final AutoCloseable closeable = () -> releaseConsumers(closed, consumers, executor);
+        try {
+            for (int i = 0; i < numberOfConsumers; i++) {
+                KafkaConsumer<T, T> consumer = consumerSupplier.get();
+                consumers.add(consumer);
+                executor.execute(() -> initConsumer(topic, syncCommit, consumer, closed));
+            }
+            return closeable;
+        } catch (Throwable e) {
+            Utils.closeQuietly(closeable, "Release Consumer");
+            throw e;
+        }
+    }
+
+    private static <T> void releaseConsumers(AtomicBoolean closed, List<KafkaConsumer<T, T>> consumers, ExecutorService executor) throws InterruptedException {
+        closed.set(true);
+        consumers.forEach(KafkaConsumer::wakeup);
+        executor.shutdown();
+        executor.awaitTermination(1, TimeUnit.MINUTES);
+    }
+
+    private static <T> void initConsumer(String topic, boolean syncCommit, KafkaConsumer<T, T> consumer, AtomicBoolean closed) {
+        try (KafkaConsumer<T, T> kafkaConsumer = consumer) {
+            kafkaConsumer.subscribe(singleton(topic));
+            while (!closed.get()) {
+                consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+                if (syncCommit)
+                    consumer.commitSync();
+            }
+        } catch (WakeupException e) {
+            // OK
+        }
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index a30ce40..c0d4bf9 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -90,8 +90,8 @@
         when(admin.listOffsets(offsetsArgMatcher(), any()))
                 .thenReturn(listOffsetsResult());
 
-        Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
-        assertEquals(Optional.of("Stable"), statesAndAssignments.getKey());
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+        assertEquals(Optional.of(ConsumerGroupState.STABLE), statesAndAssignments.getKey());
         assertTrue(statesAndAssignments.getValue().isPresent());
         assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments.getValue().get().size());
 
@@ -163,8 +163,8 @@
         )).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> unassignedTopicPartitions.contains(e.getKey()))
                 .collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
 
-        Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
-        Optional<String> state = statesAndAssignments.getKey();
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+        Optional<ConsumerGroupState> state = statesAndAssignments.getKey();
         Optional<Collection<PartitionAssignmentState>> assignments = statesAndAssignments.getValue();
 
         Map<TopicPartition, Optional<Long>> returnedOffsets = assignments.map(results ->
@@ -182,7 +182,7 @@
         expectedOffsets.put(testTopicPartition4, Optional.of(100L));
         expectedOffsets.put(testTopicPartition5, Optional.empty());
 
-        assertEquals(Optional.of("Stable"), state);
+        assertEquals(Optional.of(ConsumerGroupState.STABLE), state);
         assertEquals(expectedOffsets, returnedOffsets);
 
         verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any());
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
index f50f395..c37a9a6 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
@@ -17,279 +17,326 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteWithTopicOption(String quorum) {
-        createOffsetsTopic(listenerName(), new Properties());
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-        assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
+
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+
+    private static void generator(ClusterGenerator clusterGenerator) {
+        ConsumerGroupCommandTestUtils.generator(clusterGenerator);
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteCmdNonExistingGroup(String quorum) {
-        createOffsetsTopic(listenerName(), new Properties());
-        String missingGroup = "missing.group";
-
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
-        String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-        assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-            "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+    @Test
+    public void testDeleteWithTopicOption() {
+        String[] cgcArgs = new String[]{"--bootstrap-server", "localhost:62241", "--delete", "--group", getDummyGroupId(), "--topic"};
+        assertThrows(OptionException.class, () -> ConsumerGroupCommandOptions.fromArgs(cgcArgs));
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteNonExistingGroup(String quorum) {
-        createOffsetsTopic(listenerName(), new Properties());
-        String missingGroup = "missing.group";
-
-        // note the group to be deleted is a different (non-existing) group
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
-        Map<String, Throwable> result = service.deleteGroups();
-        assertTrue(result.size() == 1 && result.containsKey(missingGroup) && result.get(missingGroup).getCause() instanceof GroupIdNotFoundException,
-            "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+    @ClusterTemplate("generator")
+    public void testDeleteCmdNonExistingGroup(ClusterInstance cluster) {
+        String missingGroupId = getDummyGroupId();
+        String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
+        try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
+            String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
+            assertTrue(output.contains("Group '" + missingGroupId + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
+                    "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteCmdNonEmptyGroup(String quorum) throws Exception {
-        createOffsetsTopic(listenerName(), new Properties());
-
-        // run one consumer in the group
-        addConsumerGroupExecutor(1);
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
-        TestUtils.waitForCondition(
-            () -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1,
-            "The group did not initialize as expected."
-        );
-
-        String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-        assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
-            "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
+    @ClusterTemplate("generator")
+    public void testDeleteNonExistingGroup(ClusterInstance cluster) {
+        String missingGroupId = getDummyGroupId();
+        String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
+        try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
+            Map<String, Throwable> result = service.deleteGroups();
+            assertEquals(1, result.size());
+            assertNotNull(result.get(missingGroupId));
+            assertInstanceOf(GroupIdNotFoundException.class,
+                    result.get(missingGroupId).getCause(),
+                    "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteNonEmptyGroup(String quorum) throws Exception {
-        createOffsetsTopic(listenerName(), new Properties());
+    @ClusterTemplate("generator")
+    public void testDeleteNonEmptyGroup(ClusterInstance cluster) throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String groupId = composeGroupId(groupProtocol);
+            String topicName = composeTopicName(groupProtocol);
+            String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+            try (
+                    AutoCloseable consumerGroupCloseable = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+                    ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+            ) {
+                TestUtils.waitForCondition(
+                        () -> service.collectGroupMembers(groupId, false).getValue().get().size() == 1,
+                        "The group did not initialize as expected."
+                );
 
-        // run one consumer in the group
-        addConsumerGroupExecutor(1);
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+                String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
+                Map<String, Throwable> result = service.deleteGroups();
 
-        TestUtils.waitForCondition(
-            () -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1,
-            "The group did not initialize as expected."
-        );
+                assertTrue(output.contains("Group '" + groupId + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
+                        "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
 
-        Map<String, Throwable> result = service.deleteGroups();
-        assertNotNull(result.get(GROUP),
-            "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
-        assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP).getCause() instanceof GroupNotEmptyException,
-            "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
+                assertNotNull(result.get(groupId),
+                        "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
+
+                assertEquals(1, result.size());
+                assertNotNull(result.get(groupId));
+                assertInstanceOf(GroupNotEmptyException.class,
+                        result.get(groupId).getCause(),
+                        "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
+            }
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteCmdEmptyGroup(String quorum) throws Exception {
-        createOffsetsTopic(listenerName(), new Properties());
+    @ClusterTemplate("generator")
+    void testDeleteEmptyGroup(ClusterInstance cluster) throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String groupId = composeGroupId(groupProtocol);
+            String topicName = composeTopicName(groupProtocol);
+            String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+            try (
+                    AutoCloseable consumerGroupCloseable = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+                    ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+            ) {
+                TestUtils.waitForCondition(
+                        () -> service.listConsumerGroups().contains(groupId) && checkGroupState(service, groupId, STABLE),
+                        "The group did not initialize as expected."
+                );
 
-        // run one consumer in the group
-        ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+                consumerGroupCloseable.close();
 
-        TestUtils.waitForCondition(
-            () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
-            "The group did not initialize as expected."
-        );
+                TestUtils.waitForCondition(
+                        () -> checkGroupState(service, groupId, EMPTY),
+                        "The group did not become empty as expected."
+                );
 
-        executor.shutdown();
+                Map<String, Throwable> result = new HashMap<>();
+                String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
 
-        TestUtils.waitForCondition(
-            () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
-            "The group did not become empty as expected."
-        );
-
-        String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-        assertTrue(output.contains("Deletion of requested consumer groups ('" + GROUP + "') was successful."),
-            "The consumer group could not be deleted as expected");
+                assertTrue(output.contains("Deletion of requested consumer groups ('" + groupId + "') was successful."),
+                        "The consumer group could not be deleted as expected");
+                assertEquals(1, result.size());
+                assertTrue(result.containsKey(groupId));
+                assertNull(result.get(groupId), "The consumer group could not be deleted as expected");
+            }
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteCmdAllGroups(String quorum) throws Exception {
-        createOffsetsTopic(listenerName(), new Properties());
+    @ClusterTemplate("generator")
+    public void testDeleteCmdAllGroups(ClusterInstance cluster) throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String topicName = composeTopicName(groupProtocol);
+            // Create 3 groups with 1 consumer each
+            Map<String, AutoCloseable> groupIdToExecutor = IntStream.rangeClosed(1, 3)
+                    .mapToObj(i -> composeGroupId(groupProtocol) + i)
+                    .collect(Collectors.toMap(Function.identity(), group -> consumerGroupClosable(cluster, groupProtocol, group, topicName)));
+            String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--all-groups"};
 
-        // Create 3 groups with 1 consumer per each
-        Map<String, ConsumerGroupExecutor> groups = IntStream.rangeClosed(1, 3).mapToObj(i -> GROUP + i).collect(Collectors.toMap(
-            Function.identity(),
-            group -> addConsumerGroupExecutor(1, TOPIC, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, GroupProtocol.CLASSIC.name)
-        ));
+            try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
+                TestUtils.waitForCondition(() ->
+                                new HashSet<>(service.listConsumerGroups()).equals(groupIdToExecutor.keySet()) &&
+                                        groupIdToExecutor.keySet().stream().allMatch(groupId -> assertDoesNotThrow(() -> checkGroupState(service, groupId, STABLE))),
+                        "The group did not initialize as expected.");
 
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--all-groups"};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
-        TestUtils.waitForCondition(() ->
-            new HashSet<>(service.listConsumerGroups()).equals(groups.keySet()) &&
-                groups.keySet().stream().allMatch(groupId -> {
-                    try {
-                        return Objects.equals(service.collectGroupState(groupId).state, "Stable");
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }),
-            "The group did not initialize as expected.");
-
-        // Shutdown consumers to empty out groups
-        groups.values().forEach(AbstractConsumerGroupExecutor::shutdown);
-
-        TestUtils.waitForCondition(() ->
-            groups.keySet().stream().allMatch(groupId -> {
-                try {
-                    return Objects.equals(service.collectGroupState(groupId).state, "Empty");
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
+                // Shutdown consumers to empty out groups
+                for (AutoCloseable consumerGroupExecutor : groupIdToExecutor.values()) {
+                    consumerGroupExecutor.close();
                 }
-            }),
-            "The group did not become empty as expected.");
 
-        String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups).trim();
-        Set<String> expectedGroupsForDeletion = groups.keySet();
-        Set<String> deletedGroupsGrepped = Arrays.stream(output.substring(output.indexOf('(') + 1, output.indexOf(')')).split(","))
-            .map(str -> str.replaceAll("'", "").trim()).collect(Collectors.toSet());
+                TestUtils.waitForCondition(() ->
+                                groupIdToExecutor.keySet().stream().allMatch(groupId -> assertDoesNotThrow(() -> checkGroupState(service, groupId, EMPTY))),
+                        "The group did not become empty as expected.");
 
-        assertTrue(output.matches("Deletion of requested consumer groups (.*) was successful.")
-            && Objects.equals(deletedGroupsGrepped, expectedGroupsForDeletion),
-            "The consumer group(s) could not be deleted as expected");
+                String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups).trim();
+                Set<String> expectedGroupsForDeletion = groupIdToExecutor.keySet();
+                Set<String> deletedGroupsGrepped = Arrays.stream(output.substring(output.indexOf('(') + 1, output.indexOf(')')).split(","))
+                        .map(str -> str.replaceAll("'", "").trim())
+                        .collect(Collectors.toSet());
+
+                assertTrue(output.matches("Deletion of requested consumer groups (.*) was successful.")
+                                && Objects.equals(deletedGroupsGrepped, expectedGroupsForDeletion),
+                        "The consumer group(s) could not be deleted as expected");
+            }
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteEmptyGroup(String quorum) throws Exception {
-        createOffsetsTopic(listenerName(), new Properties());
+    @ClusterTemplate("generator")
+    public void testDeleteCmdWithMixOfSuccessAndError(ClusterInstance cluster) throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String groupId = composeGroupId(groupProtocol);
+            String topicName = composeTopicName(groupProtocol);
+            String missingGroupId = composeMissingGroupId(groupProtocol);
+            String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+            try (
+                    AutoCloseable consumerGroupClosable = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+                    ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+            ) {
+                TestUtils.waitForCondition(
+                        () -> service.listConsumerGroups().contains(groupId) && checkGroupState(service, groupId, STABLE),
+                        "The group did not initialize as expected.");
 
-        // run one consumer in the group
-        ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+                consumerGroupClosable.close();
+                TestUtils.waitForCondition(
+                        () -> checkGroupState(service, groupId, EMPTY),
+                        "The group did not become empty as expected.");
 
-        TestUtils.waitForCondition(
-            () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
-            "The group did not initialize as expected.");
+                cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId};
 
-        executor.shutdown();
-
-        TestUtils.waitForCondition(
-            () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
-            "The group did not become empty as expected.");
-
-        Map<String, Throwable> result = service.deleteGroups();
-        assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP) == null,
-            "The consumer group could not be deleted as expected");
+                try (ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs)) {
+                    String output = ToolsTestUtils.grabConsoleOutput(service2::deleteGroups);
+                    assertTrue(output.contains("Group '" + missingGroupId + "' could not be deleted due to:")
+                                    && output.contains(Errors.GROUP_ID_NOT_FOUND.message())
+                                    && output.contains("These consumer groups were deleted successfully: '" + groupId + "'"),
+                            "The consumer group deletion did not work as expected");
+                }
+            }
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteCmdWithMixOfSuccessAndError(String quorum) throws Exception {
-        createOffsetsTopic(listenerName(), new Properties());
-        String missingGroup = "missing.group";
+    @ClusterTemplate("generator")
+    public void testDeleteWithMixOfSuccessAndError(ClusterInstance cluster) throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String groupId = composeGroupId(groupProtocol);
+            String topicName = composeTopicName(groupProtocol);
+            String missingGroupId = composeMissingGroupId(groupProtocol);
+            String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+            try (
+                    AutoCloseable executor = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+                    ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+            ) {
+                TestUtils.waitForCondition(
+                        () -> service.listConsumerGroups().contains(groupId) && checkGroupState(service, groupId, STABLE),
+                        "The group did not initialize as expected.");
 
-        // run one consumer in the group
-        ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+                executor.close();
 
-        TestUtils.waitForCondition(
-            () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
-            "The group did not initialize as expected.");
+                TestUtils.waitForCondition(
+                        () -> checkGroupState(service, groupId, EMPTY),
+                        "The group did not become empty as expected.");
 
-        executor.shutdown();
+                cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId};
 
-        TestUtils.waitForCondition(
-            () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
-            "The group did not become empty as expected.");
-
-        cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup};
-
-        ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
-
-        String output = ToolsTestUtils.grabConsoleOutput(service2::deleteGroups);
-        assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:")
-            && output.contains(Errors.GROUP_ID_NOT_FOUND.message())
-            && output.contains("These consumer groups were deleted successfully: '" + GROUP + "'"),
-            "The consumer group deletion did not work as expected");
+                try (ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs)) {
+                    Map<String, Throwable> result = service2.deleteGroups();
+                    assertTrue(result.size() == 2 &&
+                                    result.containsKey(groupId) && result.get(groupId) == null &&
+                                    result.containsKey(missingGroupId) &&
+                                    result.get(missingGroupId).getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()),
+                            "The consumer group deletion did not work as expected");
+                }
+            }
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteWithMixOfSuccessAndError(String quorum) throws Exception {
-        createOffsetsTopic(listenerName(), new Properties());
-        String missingGroup = "missing.group";
-
-        // run one consumer in the group
-        ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
-        ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
-        TestUtils.waitForCondition(
-            () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
-            "The group did not initialize as expected.");
-
-        executor.shutdown();
-
-        TestUtils.waitForCondition(
-            () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
-            "The group did not become empty as expected.");
-
-        cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup};
-
-        ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
-        Map<String, Throwable> result = service2.deleteGroups();
-        assertTrue(result.size() == 2 &&
-                result.containsKey(GROUP) && result.get(GROUP) == null &&
-                result.containsKey(missingGroup) &&
-                result.get(missingGroup).getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()),
-            "The consumer group deletion did not work as expected");
+    @Test
+    public void testDeleteWithUnrecognizedNewConsumerOption() {
+        String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", "localhost:62241", "--delete", "--group", getDummyGroupId()};
+        assertThrows(OptionException.class, () -> ConsumerGroupCommandOptions.fromArgs(cgcArgs));
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteWithUnrecognizedNewConsumerOption(String quorum) {
-        String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
-        assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
+    private String getDummyGroupId() {
+        return composeGroupId(null);
+    }
+
+    private String composeGroupId(GroupProtocol protocol) {
+        String groupPrefix = "test.";
+        return protocol != null ? groupPrefix + protocol.name : groupPrefix + "dummy";
+    }
+
+    private String composeTopicName(GroupProtocol protocol) {
+        String topicPrefix = "foo.";
+        return protocol != null ? topicPrefix + protocol.name : topicPrefix + "dummy";
+    }
+
+    private String composeMissingGroupId(GroupProtocol protocol) {
+        String missingGroupPrefix = "missing.";
+        return protocol != null ? missingGroupPrefix + protocol.name : missingGroupPrefix + "dummy";
+    }
+
+    private AutoCloseable consumerGroupClosable(ClusterInstance cluster, GroupProtocol protocol, String groupId, String topicName) {
+        Map<String, Object> configs = composeConfigs(
+                cluster,
+                groupId,
+                protocol.name,
+                emptyMap());
+
+        return ConsumerGroupCommandTestUtils.buildConsumers(
+                1,
+                false,
+                topicName,
+                () -> new KafkaConsumer<String, String>(configs)
+        );
+    }
+
+    private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService service, String groupId, ConsumerGroupState state) throws Exception {
+        return Objects.equals(service.collectGroupState(groupId).state, state);
+    }
+
+    private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) {
+        ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args);
+        return new ConsumerGroupCommand.ConsumerGroupService(
+                opts,
+                singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))
+        );
+    }
+
+    private Map<String, Object> composeConfigs(ClusterInstance cluster, String groupId, String groupProtocol, Map<String, Object> customConfigs) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+        configs.put(GROUP_ID_CONFIG, groupId);
+        configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
+        configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
+
+        configs.putAll(customConfigs);
+        return configs;
     }
 }
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index fcf86cd..e490393 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -152,8 +153,8 @@
         String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
-        Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
-        assertTrue(res.getKey().map(s -> s.contains("Dead")).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
+        assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
             "Expected the state to be 'Dead', with no members in the group '" + group + "'.");
     }
 
@@ -169,12 +170,12 @@
         String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
-        Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
-        assertTrue(res.getKey().map(s -> s.contains("Dead")).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
+        assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
             "Expected the state to be 'Dead', with no members in the group '" + group + "'.");
 
-        Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true);
-        assertTrue(res2.getKey().map(s -> s.contains("Dead")).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false),
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true);
+        assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false),
             "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option).");
     }
 
@@ -191,7 +192,7 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         GroupState state = service.collectGroupState(group);
-        assertTrue(Objects.equals(state.state, "Dead") && state.numMembers == 0 &&
+        assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 &&
                 state.coordinator != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator.id()).isEmpty(),
             "Expected the state to be 'Dead', with no members in the group '" + group + "'."
         );
@@ -285,13 +286,13 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
-            Optional<String> state = groupOffsets.getKey();
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
+            Optional<ConsumerGroupState> state = groupOffsets.getKey();
             Optional<Collection<PartitionAssignmentState>> assignments = groupOffsets.getValue();
 
             Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
 
-            boolean res = state.map(s -> s.contains("Stable")).orElse(false) &&
+            boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
                 assignments.isPresent() &&
                 assignments.get().stream().filter(isGrp).count() == 1;
 
@@ -321,13 +322,13 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> groupMembers = service.collectGroupMembers(GROUP, false);
-            Optional<String> state = groupMembers.getKey();
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> groupMembers = service.collectGroupMembers(GROUP, false);
+            Optional<ConsumerGroupState> state = groupMembers.getKey();
             Optional<Collection<MemberAssignmentState>> assignments = groupMembers.getValue();
 
             Predicate<MemberAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
 
-            boolean res = state.map(s -> s.contains("Stable")).orElse(false) &&
+            boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
                 assignments.isPresent() &&
                 assignments.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1;
 
@@ -345,7 +346,7 @@
                 !Objects.equals(assignmentState.host, ConsumerGroupCommand.MISSING_COLUMN_VALUE);
         }, "Expected a 'Stable' group status, rows and valid member information for group " + GROUP + ".");
 
-        Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
 
         if (res.getValue().isPresent()) {
             assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1,
@@ -372,7 +373,7 @@
 
         TestUtils.waitForCondition(() -> {
             GroupState state = service.collectGroupState(GROUP);
-            return Objects.equals(state.state, "Stable") &&
+            return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
                 state.numMembers == 1 &&
                 Objects.equals(state.assignmentStrategy, "range") &&
                 state.coordinator != null &&
@@ -399,7 +400,7 @@
 
         TestUtils.waitForCondition(() -> {
             GroupState state = service.collectGroupState(GROUP);
-            return Objects.equals(state.state, "Stable") &&
+            return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
                 state.numMembers == 1 &&
                 Objects.equals(state.assignmentStrategy, expectedName) &&
                 state.coordinator != null &&
@@ -445,8 +446,8 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
-            return res.getKey().map(s -> s.contains("Stable")).orElse(false)
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
                 && res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, GROUP) && assignment.offset.isPresent())).orElse(false);
         }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
 
@@ -454,12 +455,12 @@
         executor.shutdown();
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(GROUP);
-            Optional<String> state = offsets.getKey();
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(GROUP);
+            Optional<ConsumerGroupState> state = offsets.getKey();
             Optional<Collection<PartitionAssignmentState>> assignments = offsets.getValue();
             List<PartitionAssignmentState> testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, GROUP)).collect(Collectors.toList());
             PartitionAssignmentState assignment = testGroupAssignments.get(0);
-            return state.map(s -> s.contains("Empty")).orElse(false) &&
+            return state.map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) &&
                 testGroupAssignments.size() == 1 &&
                 assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone
                 assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
@@ -479,8 +480,8 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
-            return res.getKey().map(s -> s.contains("Stable")).orElse(false)
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
                 && res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, GROUP))).orElse(false);
         }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
 
@@ -488,8 +489,8 @@
         executor.shutdown();
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
-            return res.getKey().map(s -> s.contains("Empty")).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty();
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty();
         }, "Expected no member in describe group members results for group '" + GROUP + "'");
     }
 
@@ -506,7 +507,7 @@
 
         TestUtils.waitForCondition(() -> {
             GroupState state = service.collectGroupState(GROUP);
-            return Objects.equals(state.state, "Stable") &&
+            return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
                 state.numMembers == 1 &&
                 state.coordinator != null &&
                 brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0;
@@ -517,7 +518,7 @@
 
         TestUtils.waitForCondition(() -> {
             GroupState state = service.collectGroupState(GROUP);
-            return Objects.equals(state.state, "Empty") && state.numMembers == 0;
+            return Objects.equals(state.state, ConsumerGroupState.EMPTY) && state.numMembers == 0;
         }, "Expected the group '" + GROUP + "' to become empty after the only member leaving.");
     }
 
@@ -554,8 +555,8 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
-            return res.getKey().map(s -> s.contains("Stable")).isPresent() &&
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).isPresent() &&
                 res.getValue().isPresent() &&
                 res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1 &&
                 res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 1;
@@ -574,8 +575,8 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
-            return res.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
                 res.getValue().isPresent() &&
                 res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
                 res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 1 &&
@@ -583,8 +584,8 @@
                 res.getValue().get().stream().allMatch(s -> s.assignment.isEmpty());
         }, "Expected rows for consumers with no assigned partitions in describe group results");
 
-        Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
-        assertTrue(res.getKey().map(s -> s.contains("Stable")).orElse(false)
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
+        assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
                 && res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false),
             "Expected additional columns in verbose version of describe members");
     }
@@ -602,7 +603,7 @@
 
         TestUtils.waitForCondition(() -> {
             GroupState state = service.collectGroupState(GROUP);
-            return Objects.equals(state.state, "Stable") && state.numMembers == 2;
+            return Objects.equals(state.state, ConsumerGroupState.STABLE) && state.numMembers == 2;
         }, "Expected two consumers in describe group results");
     }
 
@@ -643,8 +644,8 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
-            return res.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
                 res.getValue().isPresent() &&
                 res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
                 res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 2 &&
@@ -666,16 +667,16 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
-            return res.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
                 res.getValue().isPresent() &&
                 res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
                 res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 2 &&
                 res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0);
         }, "Expected two rows (one row per consumer) in describe group members results.");
 
-        Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
-        assertTrue(res.getKey().map(s -> s.contains("Stable")).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
+        Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
+        assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
             "Expected additional columns in verbose version of describe members");
     }
 
@@ -694,7 +695,7 @@
 
         TestUtils.waitForCondition(() -> {
             GroupState state = service.collectGroupState(GROUP);
-            return Objects.equals(state.state, "Stable") && Objects.equals(state.group, GROUP) && state.numMembers == 2;
+            return Objects.equals(state.state, ConsumerGroupState.STABLE) && Objects.equals(state.group, GROUP) && state.numMembers == 2;
         }, "Expected a stable group with two members in describe group state result.");
     }
 
@@ -712,8 +713,8 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
-            return res.getKey().map(s -> s.contains("Empty")).orElse(false)
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+            return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false)
                 && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2;
         }, "Expected a stable group with two members in describe group state result.");
     }
@@ -812,11 +813,11 @@
         ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
 
         TestUtils.waitForCondition(() -> {
-            Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
+            Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
 
             Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
 
-            boolean res = groupOffsets.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+            boolean res = groupOffsets.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
                 groupOffsets.getValue().isPresent() &&
                 groupOffsets.getValue().get().stream().filter(isGrp).count() == 1;
 
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 4d805ce..e86c61e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
@@ -501,8 +502,8 @@
 
     private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService consumerGroupService, String group) throws Exception {
         TestUtils.waitForCondition(() -> {
-            String state = consumerGroupService.collectGroupState(group).state;
-            return Objects.equals(state, "Empty") || Objects.equals(state, "Dead");
+            ConsumerGroupState state = consumerGroupService.collectGroupState(group).state;
+            return Objects.equals(state, ConsumerGroupState.EMPTY) || Objects.equals(state, ConsumerGroupState.DEAD);
         }, "Expected that consumer group is inactive. Actual state: " + consumerGroupService.collectGroupState(group).state);
     }