KAFKA-16593 Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions (#15766)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 1b44541..8970d49 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -107,6 +107,8 @@
<allow pkg="kafka.server"/>
<allow pkg="kafka.zk" />
<allow pkg="org.apache.kafka.clients.admin"/>
+ <allow pkg="org.apache.kafka.clients.consumer"/>
+ <allow pkg="org.apache.kafka.coordinator.group"/>
<subpackage name="annotation">
<allow pkg="kafka.test"/>
</subpackage>
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index c04f9ec..0259314 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -21,14 +21,22 @@
import kafka.server.BrokerFeatures;
import kafka.test.annotation.ClusterTest;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+
public interface ClusterInstance {
enum ClusterType {
@@ -145,4 +153,18 @@
void startBroker(int brokerId);
void waitForReadyBrokers() throws InterruptedException;
+
+ default Set<GroupProtocol> supportedGroupProtocols() {
+ Map<String, String> serverProperties = config().serverProperties();
+ Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
+ supportedGroupProtocols.add(CLASSIC);
+
+ // KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
+ if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") ||
+ serverProperties.getOrDefault(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "").contains("consumer")) {
+ supportedGroupProtocols.add(CONSUMER);
+ }
+
+ return Collections.unmodifiableSet(supportedGroupProtocols);
+ }
}
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 0eb94ab..1bc56df 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -27,15 +27,24 @@
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+
+
@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
}) // Set defaults for a few params in @ClusterTest(s)
@@ -135,4 +144,49 @@
public void testDefaults(ClusterInstance clusterInstance) {
Assertions.assertEquals(MetadataVersion.IBP_3_8_IV0, clusterInstance.config().metadataVersion());
}
+
+ @ClusterTests({
+ @ClusterTest(name = "enable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+ }),
+ @ClusterTest(name = "enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+ }),
+ @ClusterTest(name = "enable-new-coordinator-and-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+ }),
+ @ClusterTest(name = "enable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
+ }),
+ @ClusterTest(name = "disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
+ @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
+ }),
+ })
+ public void testSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
+ Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
+ supportedGroupProtocols.add(CLASSIC);
+ supportedGroupProtocols.add(CONSUMER);
+ Assertions.assertTrue(clusterInstance.supportedGroupProtocols().containsAll(supportedGroupProtocols));
+ Assertions.assertEquals(2, clusterInstance.supportedGroupProtocols().size());
+ }
+
+ @ClusterTests({
+ @ClusterTest(name = "disable-new-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
+ }),
+ @ClusterTest(name = "disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
+ }),
+ @ClusterTest(name = "disable-new-coordinator-and-disable-new-consumer-rebalance-coordinator", clusterType = Type.ALL, serverProperties = {
+ @ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
+ @ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
+ }),
+ })
+ public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
+ Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
+ Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size());
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 92fa463..27151cc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -276,7 +276,7 @@
return new ArrayList<>(result.all().get());
}
- private boolean shouldPrintMemberState(String group, Optional<String> state, Optional<Integer> numRows) {
+ private boolean shouldPrintMemberState(String group, Optional<ConsumerGroupState> state, Optional<Integer> numRows) {
// numRows contains the number of data rows, if any, compiled from the API call in the caller method.
// if it's undefined or 0, there is no relevant group information to display.
if (!numRows.isPresent()) {
@@ -286,37 +286,37 @@
int num = numRows.get();
- String state0 = state.orElse("NONE");
+ ConsumerGroupState state0 = state.orElse(ConsumerGroupState.UNKNOWN);
switch (state0) {
- case "Dead":
+ case DEAD:
printError("Consumer group '" + group + "' does not exist.", Optional.empty());
break;
- case "Empty":
+ case EMPTY:
System.err.println("\nConsumer group '" + group + "' has no active members.");
break;
- case "PreparingRebalance":
- case "CompletingRebalance":
- case "Assigning":
- case "Reconciling":
+ case PREPARING_REBALANCE:
+ case COMPLETING_REBALANCE:
+ case ASSIGNING:
+ case RECONCILING:
System.err.println("\nWarning: Consumer group '" + group + "' is rebalancing.");
break;
- case "Stable":
+ case STABLE:
break;
default:
// the control should never reach here
throw new KafkaException("Expected a valid consumer group state, but found '" + state0 + "'.");
}
- return !state0.contains("Dead") && num > 0;
+ return !state0.equals(ConsumerGroupState.DEAD) && num > 0;
}
private Optional<Integer> size(Optional<? extends Collection<?>> colOpt) {
return colOpt.map(Collection::size);
}
- private void printOffsets(Map<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
+ private void printOffsets(Map<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
offsets.forEach((groupId, tuple) -> {
- Optional<String> state = tuple.getKey();
+ Optional<ConsumerGroupState> state = tuple.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = tuple.getValue();
if (shouldPrintMemberState(groupId, state, size(assignments))) {
@@ -357,9 +357,9 @@
return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
}
- private void printMembers(Map<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
+ private void printMembers(Map<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
members.forEach((groupId, tuple) -> {
- Optional<String> state = tuple.getKey();
+ Optional<ConsumerGroupState> state = tuple.getKey();
Optional<Collection<MemberAssignmentState>> assignments = tuple.getValue();
int maxGroupLen = 15, maxConsumerIdLen = 15, maxGroupInstanceIdLen = 17, maxHostLen = 15, maxClientIdLen = 15;
boolean includeGroupInstanceId = false;
@@ -430,7 +430,7 @@
String format = "\n%" + -coordinatorColLen + "s %-25s %-20s %-15s %s";
System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
- System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers);
+ System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state.toString(), state.numMembers);
System.out.println();
}
});
@@ -446,11 +446,11 @@
long subActions = Stream.of(membersOptPresent, offsetsOptPresent, stateOptPresent).filter(x -> x).count();
if (subActions == 0 || offsetsOptPresent) {
- TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets
+ TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets
= collectGroupsOffsets(groupIds);
printOffsets(offsets);
} else if (membersOptPresent) {
- TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members
+ TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> members
= collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt));
printMembers(members, opts.options.has(opts.verboseOpt));
} else {
@@ -684,16 +684,16 @@
/**
* Returns the state of the specified consumer group and partition assignment states
*/
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
}
/**
* Returns states of the specified consumer groups and partition assignment states
*/
- TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
+ TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
- TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();
+ TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();
consumerGroups.forEach((groupId, consumerGroup) -> {
ConsumerGroupState state = consumerGroup.state();
@@ -735,22 +735,22 @@
rowsWithConsumer.addAll(rowsWithoutConsumer);
- groupOffsets.put(groupId, new SimpleImmutableEntry<>(Optional.of(state.toString()), Optional.of(rowsWithConsumer)));
+ groupOffsets.put(groupId, new SimpleImmutableEntry<>(Optional.of(state), Optional.of(rowsWithConsumer)));
});
return groupOffsets;
}
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
return collectGroupsMembers(Collections.singleton(groupId), verbose).get(groupId);
}
- TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
+ TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
- TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();
+ TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();
consumerGroups.forEach((groupId, consumerGroup) -> {
- String state = consumerGroup.state().toString();
+ ConsumerGroupState state = consumerGroup.state();
List<MemberAssignmentState> memberAssignmentStates = consumerGroup.members().stream().map(consumer ->
new MemberAssignmentState(
groupId,
@@ -778,7 +778,7 @@
groupId,
groupDescription.coordinator(),
groupDescription.partitionAssignor(),
- groupDescription.state().toString(),
+ groupDescription.state(),
groupDescription.members().size()
)));
return res;
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java
index 04a3b2e..18cd3f4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java
@@ -16,16 +16,17 @@
*/
package org.apache.kafka.tools.consumer.group;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.Node;
class GroupState {
final String group;
final Node coordinator;
final String assignmentStrategy;
- final String state;
+ final ConsumerGroupState state;
final int numMembers;
- GroupState(String group, Node coordinator, String assignmentStrategy, String state, int numMembers) {
+ GroupState(String group, Node coordinator, String assignmentStrategy, ConsumerGroupState state, int numMembers) {
this.group = group;
this.coordinator = coordinator;
this.assignmentStrategy = assignmentStrategy;
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
new file mode 100644
index 0000000..1ffad2b
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterGenerator;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static java.util.Collections.singleton;
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+
+class ConsumerGroupCommandTestUtils {
+
+ private ConsumerGroupCommandTestUtils() {
+ }
+
+ static void generator(ClusterGenerator clusterGenerator) {
+ Map<String, String> serverProperties = new HashMap<>();
+ serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
+ serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+ serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
+
+ ClusterConfig zk = ClusterConfig.defaultBuilder()
+ .setType(ZK)
+ .setServerProperties(serverProperties)
+ .build();
+ clusterGenerator.accept(zk);
+
+ ClusterConfig raftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
+ .setType(KRAFT)
+ .setServerProperties(serverProperties)
+ .build();
+ clusterGenerator.accept(raftWithLegacyCoordinator);
+
+ ClusterConfig combinedKRaftWithLegacyCoordinator = ClusterConfig.defaultBuilder()
+ .setType(CO_KRAFT)
+ .setServerProperties(serverProperties)
+ .build();
+ clusterGenerator.accept(combinedKRaftWithLegacyCoordinator);
+
+ // Following are test case config with new group coordinator
+ serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
+
+ ClusterConfig raftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
+ .setType(KRAFT)
+ .setName("newGroupCoordinator")
+ .setServerProperties(serverProperties)
+ .build();
+ clusterGenerator.accept(raftWithNewGroupCoordinator);
+
+ ClusterConfig combinedKRaftWithNewGroupCoordinator = ClusterConfig.defaultBuilder()
+ .setType(CO_KRAFT)
+ .setName("newGroupCoordinator")
+ .setServerProperties(serverProperties)
+ .build();
+ clusterGenerator.accept(combinedKRaftWithNewGroupCoordinator);
+ }
+
+ static <T> AutoCloseable buildConsumers(int numberOfConsumers,
+ boolean syncCommit,
+ String topic,
+ Supplier<KafkaConsumer<T, T>> consumerSupplier) {
+ List<KafkaConsumer<T, T>> consumers = new ArrayList<>(numberOfConsumers);
+ ExecutorService executor = Executors.newFixedThreadPool(numberOfConsumers);
+ AtomicBoolean closed = new AtomicBoolean(false);
+ final AutoCloseable closeable = () -> releaseConsumers(closed, consumers, executor);
+ try {
+ for (int i = 0; i < numberOfConsumers; i++) {
+ KafkaConsumer<T, T> consumer = consumerSupplier.get();
+ consumers.add(consumer);
+ executor.execute(() -> initConsumer(topic, syncCommit, consumer, closed));
+ }
+ return closeable;
+ } catch (Throwable e) {
+ Utils.closeQuietly(closeable, "Release Consumer");
+ throw e;
+ }
+ }
+
+ private static <T> void releaseConsumers(AtomicBoolean closed, List<KafkaConsumer<T, T>> consumers, ExecutorService executor) throws InterruptedException {
+ closed.set(true);
+ consumers.forEach(KafkaConsumer::wakeup);
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+
+ private static <T> void initConsumer(String topic, boolean syncCommit, KafkaConsumer<T, T> consumer, AtomicBoolean closed) {
+ try (KafkaConsumer<T, T> kafkaConsumer = consumer) {
+ kafkaConsumer.subscribe(singleton(topic));
+ while (!closed.get()) {
+ consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+ if (syncCommit)
+ consumer.commitSync();
+ }
+ } catch (WakeupException e) {
+ // OK
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index a30ce40..c0d4bf9 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -90,8 +90,8 @@
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
- assertEquals(Optional.of("Stable"), statesAndAssignments.getKey());
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+ assertEquals(Optional.of(ConsumerGroupState.STABLE), statesAndAssignments.getKey());
assertTrue(statesAndAssignments.getValue().isPresent());
assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments.getValue().get().size());
@@ -163,8 +163,8 @@
)).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> unassignedTopicPartitions.contains(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
- Optional<String> state = statesAndAssignments.getKey();
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+ Optional<ConsumerGroupState> state = statesAndAssignments.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = statesAndAssignments.getValue();
Map<TopicPartition, Optional<Long>> returnedOffsets = assignments.map(results ->
@@ -182,7 +182,7 @@
expectedOffsets.put(testTopicPartition4, Optional.of(100L));
expectedOffsets.put(testTopicPartition5, Optional.empty());
- assertEquals(Optional.of("Stable"), state);
+ assertEquals(Optional.of(ConsumerGroupState.STABLE), state);
assertEquals(expectedOffsets, returnedOffsets);
verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any());
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
index f50f395..c37a9a6 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
@@ -17,279 +17,326 @@
package org.apache.kafka.tools.consumer.group;
import joptsimple.OptionException;
+import kafka.test.ClusterGenerator;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTemplate;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteWithTopicOption(String quorum) {
- createOffsetsTopic(listenerName(), new Properties());
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
- assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
+
+@ExtendWith(value = ClusterTestExtensions.class)
+public class DeleteConsumerGroupsTest {
+
+ private static void generator(ClusterGenerator clusterGenerator) {
+ ConsumerGroupCommandTestUtils.generator(clusterGenerator);
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteCmdNonExistingGroup(String quorum) {
- createOffsetsTopic(listenerName(), new Properties());
- String missingGroup = "missing.group";
-
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
- String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
- assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
- "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+ @Test
+ public void testDeleteWithTopicOption() {
+ String[] cgcArgs = new String[]{"--bootstrap-server", "localhost:62241", "--delete", "--group", getDummyGroupId(), "--topic"};
+ assertThrows(OptionException.class, () -> ConsumerGroupCommandOptions.fromArgs(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteNonExistingGroup(String quorum) {
- createOffsetsTopic(listenerName(), new Properties());
- String missingGroup = "missing.group";
-
- // note the group to be deleted is a different (non-existing) group
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
- Map<String, Throwable> result = service.deleteGroups();
- assertTrue(result.size() == 1 && result.containsKey(missingGroup) && result.get(missingGroup).getCause() instanceof GroupIdNotFoundException,
- "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+ @ClusterTemplate("generator")
+ public void testDeleteCmdNonExistingGroup(ClusterInstance cluster) {
+ String missingGroupId = getDummyGroupId();
+ String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
+ try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
+ String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
+ assertTrue(output.contains("Group '" + missingGroupId + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
+ "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteCmdNonEmptyGroup(String quorum) throws Exception {
- createOffsetsTopic(listenerName(), new Properties());
-
- // run one consumer in the group
- addConsumerGroupExecutor(1);
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
- TestUtils.waitForCondition(
- () -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1,
- "The group did not initialize as expected."
- );
-
- String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
- assertTrue(output.contains("Group '" + GROUP + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
- "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
+ @ClusterTemplate("generator")
+ public void testDeleteNonExistingGroup(ClusterInstance cluster) {
+ String missingGroupId = getDummyGroupId();
+ String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
+ try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
+ Map<String, Throwable> result = service.deleteGroups();
+ assertEquals(1, result.size());
+ assertNotNull(result.get(missingGroupId));
+ assertInstanceOf(GroupIdNotFoundException.class,
+ result.get(missingGroupId).getCause(),
+ "The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group");
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteNonEmptyGroup(String quorum) throws Exception {
- createOffsetsTopic(listenerName(), new Properties());
+ @ClusterTemplate("generator")
+ public void testDeleteNonEmptyGroup(ClusterInstance cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String groupId = composeGroupId(groupProtocol);
+ String topicName = composeTopicName(groupProtocol);
+ String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+ try (
+ AutoCloseable consumerGroupCloseable = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+ ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+ ) {
+ TestUtils.waitForCondition(
+ () -> service.collectGroupMembers(groupId, false).getValue().get().size() == 1,
+ "The group did not initialize as expected."
+ );
- // run one consumer in the group
- addConsumerGroupExecutor(1);
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+ String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
+ Map<String, Throwable> result = service.deleteGroups();
- TestUtils.waitForCondition(
- () -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1,
- "The group did not initialize as expected."
- );
+ assertTrue(output.contains("Group '" + groupId + "' could not be deleted due to:") && output.contains(Errors.NON_EMPTY_GROUP.message()),
+ "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Output was: (" + output + ")");
- Map<String, Throwable> result = service.deleteGroups();
- assertNotNull(result.get(GROUP),
- "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
- assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP).getCause() instanceof GroupNotEmptyException,
- "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
+ assertNotNull(result.get(groupId),
+ "Group was deleted successfully, but it shouldn't have been. Result was:(" + result + ")");
+
+ assertEquals(1, result.size());
+ assertNotNull(result.get(groupId));
+ assertInstanceOf(GroupNotEmptyException.class,
+ result.get(groupId).getCause(),
+ "The expected error (" + Errors.NON_EMPTY_GROUP + ") was not detected while deleting consumer group. Result was:(" + result + ")");
+ }
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteCmdEmptyGroup(String quorum) throws Exception {
- createOffsetsTopic(listenerName(), new Properties());
+ @ClusterTemplate("generator")
+ void testDeleteEmptyGroup(ClusterInstance cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String groupId = composeGroupId(groupProtocol);
+ String topicName = composeTopicName(groupProtocol);
+ String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+ try (
+ AutoCloseable consumerGroupCloseable = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+ ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+ ) {
+ TestUtils.waitForCondition(
+ () -> service.listConsumerGroups().contains(groupId) && checkGroupState(service, groupId, STABLE),
+ "The group did not initialize as expected."
+ );
- // run one consumer in the group
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+ consumerGroupCloseable.close();
- TestUtils.waitForCondition(
- () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
- "The group did not initialize as expected."
- );
+ TestUtils.waitForCondition(
+ () -> checkGroupState(service, groupId, EMPTY),
+ "The group did not become empty as expected."
+ );
- executor.shutdown();
+ Map<String, Throwable> result = new HashMap<>();
+ String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
- TestUtils.waitForCondition(
- () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
- "The group did not become empty as expected."
- );
-
- String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
- assertTrue(output.contains("Deletion of requested consumer groups ('" + GROUP + "') was successful."),
- "The consumer group could not be deleted as expected");
+ assertTrue(output.contains("Deletion of requested consumer groups ('" + groupId + "') was successful."),
+ "The consumer group could not be deleted as expected");
+ assertEquals(1, result.size());
+ assertTrue(result.containsKey(groupId));
+ assertNull(result.get(groupId), "The consumer group could not be deleted as expected");
+ }
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteCmdAllGroups(String quorum) throws Exception {
- createOffsetsTopic(listenerName(), new Properties());
+ @ClusterTemplate("generator")
+ public void testDeleteCmdAllGroups(ClusterInstance cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String topicName = composeTopicName(groupProtocol);
+ // Create 3 groups with 1 consumer each
+ Map<String, AutoCloseable> groupIdToExecutor = IntStream.rangeClosed(1, 3)
+ .mapToObj(i -> composeGroupId(groupProtocol) + i)
+ .collect(Collectors.toMap(Function.identity(), group -> consumerGroupClosable(cluster, groupProtocol, group, topicName)));
+ String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--all-groups"};
- // Create 3 groups with 1 consumer per each
- Map<String, ConsumerGroupExecutor> groups = IntStream.rangeClosed(1, 3).mapToObj(i -> GROUP + i).collect(Collectors.toMap(
- Function.identity(),
- group -> addConsumerGroupExecutor(1, TOPIC, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, GroupProtocol.CLASSIC.name)
- ));
+ try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
+ TestUtils.waitForCondition(() ->
+ new HashSet<>(service.listConsumerGroups()).equals(groupIdToExecutor.keySet()) &&
+ groupIdToExecutor.keySet().stream().allMatch(groupId -> assertDoesNotThrow(() -> checkGroupState(service, groupId, STABLE))),
+ "The group did not initialize as expected.");
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--all-groups"};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
- TestUtils.waitForCondition(() ->
- new HashSet<>(service.listConsumerGroups()).equals(groups.keySet()) &&
- groups.keySet().stream().allMatch(groupId -> {
- try {
- return Objects.equals(service.collectGroupState(groupId).state, "Stable");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }),
- "The group did not initialize as expected.");
-
- // Shutdown consumers to empty out groups
- groups.values().forEach(AbstractConsumerGroupExecutor::shutdown);
-
- TestUtils.waitForCondition(() ->
- groups.keySet().stream().allMatch(groupId -> {
- try {
- return Objects.equals(service.collectGroupState(groupId).state, "Empty");
- } catch (Exception e) {
- throw new RuntimeException(e);
+ // Shutdown consumers to empty out groups
+ for (AutoCloseable consumerGroupExecutor : groupIdToExecutor.values()) {
+ consumerGroupExecutor.close();
}
- }),
- "The group did not become empty as expected.");
- String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups).trim();
- Set<String> expectedGroupsForDeletion = groups.keySet();
- Set<String> deletedGroupsGrepped = Arrays.stream(output.substring(output.indexOf('(') + 1, output.indexOf(')')).split(","))
- .map(str -> str.replaceAll("'", "").trim()).collect(Collectors.toSet());
+ TestUtils.waitForCondition(() ->
+ groupIdToExecutor.keySet().stream().allMatch(groupId -> assertDoesNotThrow(() -> checkGroupState(service, groupId, EMPTY))),
+ "The group did not become empty as expected.");
- assertTrue(output.matches("Deletion of requested consumer groups (.*) was successful.")
- && Objects.equals(deletedGroupsGrepped, expectedGroupsForDeletion),
- "The consumer group(s) could not be deleted as expected");
+ String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups).trim();
+ Set<String> expectedGroupsForDeletion = groupIdToExecutor.keySet();
+ Set<String> deletedGroupsGrepped = Arrays.stream(output.substring(output.indexOf('(') + 1, output.indexOf(')')).split(","))
+ .map(str -> str.replaceAll("'", "").trim())
+ .collect(Collectors.toSet());
+
+ assertTrue(output.matches("Deletion of requested consumer groups (.*) was successful.")
+ && Objects.equals(deletedGroupsGrepped, expectedGroupsForDeletion),
+ "The consumer group(s) could not be deleted as expected");
+ }
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteEmptyGroup(String quorum) throws Exception {
- createOffsetsTopic(listenerName(), new Properties());
+ @ClusterTemplate("generator")
+ public void testDeleteCmdWithMixOfSuccessAndError(ClusterInstance cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String groupId = composeGroupId(groupProtocol);
+ String topicName = composeTopicName(groupProtocol);
+ String missingGroupId = composeMissingGroupId(groupProtocol);
+ String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+ try (
+ AutoCloseable consumerGroupClosable = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+ ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+ ) {
+ TestUtils.waitForCondition(
+ () -> service.listConsumerGroups().contains(groupId) && checkGroupState(service, groupId, STABLE),
+ "The group did not initialize as expected.");
- // run one consumer in the group
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+ consumerGroupClosable.close();
+ TestUtils.waitForCondition(
+ () -> checkGroupState(service, groupId, EMPTY),
+ "The group did not become empty as expected.");
- TestUtils.waitForCondition(
- () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
- "The group did not initialize as expected.");
+ cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId};
- executor.shutdown();
-
- TestUtils.waitForCondition(
- () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
- "The group did not become empty as expected.");
-
- Map<String, Throwable> result = service.deleteGroups();
- assertTrue(result.size() == 1 && result.containsKey(GROUP) && result.get(GROUP) == null,
- "The consumer group could not be deleted as expected");
+ try (ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs)) {
+ String output = ToolsTestUtils.grabConsoleOutput(service2::deleteGroups);
+ assertTrue(output.contains("Group '" + missingGroupId + "' could not be deleted due to:")
+ && output.contains(Errors.GROUP_ID_NOT_FOUND.message())
+ && output.contains("These consumer groups were deleted successfully: '" + groupId + "'"),
+ "The consumer group deletion did not work as expected");
+ }
+ }
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteCmdWithMixOfSuccessAndError(String quorum) throws Exception {
- createOffsetsTopic(listenerName(), new Properties());
- String missingGroup = "missing.group";
+ @ClusterTemplate("generator")
+ public void testDeleteWithMixOfSuccessAndError(ClusterInstance cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String groupId = composeGroupId(groupProtocol);
+ String topicName = composeTopicName(groupProtocol);
+ String missingGroupId = composeMissingGroupId(groupProtocol);
+ String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId};
+ try (
+ AutoCloseable executor = consumerGroupClosable(cluster, groupProtocol, groupId, topicName);
+ ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)
+ ) {
+ TestUtils.waitForCondition(
+ () -> service.listConsumerGroups().contains(groupId) && checkGroupState(service, groupId, STABLE),
+ "The group did not initialize as expected.");
- // run one consumer in the group
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
+ executor.close();
- TestUtils.waitForCondition(
- () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
- "The group did not initialize as expected.");
+ TestUtils.waitForCondition(
+ () -> checkGroupState(service, groupId, EMPTY),
+ "The group did not become empty as expected.");
- executor.shutdown();
+ cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", groupId, "--group", missingGroupId};
- TestUtils.waitForCondition(
- () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
- "The group did not become empty as expected.");
-
- cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup};
-
- ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
-
- String output = ToolsTestUtils.grabConsoleOutput(service2::deleteGroups);
- assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:")
- && output.contains(Errors.GROUP_ID_NOT_FOUND.message())
- && output.contains("These consumer groups were deleted successfully: '" + GROUP + "'"),
- "The consumer group deletion did not work as expected");
+ try (ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs)) {
+ Map<String, Throwable> result = service2.deleteGroups();
+ assertTrue(result.size() == 2 &&
+ result.containsKey(groupId) && result.get(groupId) == null &&
+ result.containsKey(missingGroupId) &&
+ result.get(missingGroupId).getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()),
+ "The consumer group deletion did not work as expected");
+ }
+ }
+ }
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteWithMixOfSuccessAndError(String quorum) throws Exception {
- createOffsetsTopic(listenerName(), new Properties());
- String missingGroup = "missing.group";
-
- // run one consumer in the group
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1);
- String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
- ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
-
- TestUtils.waitForCondition(
- () -> service.listConsumerGroups().contains(GROUP) && Objects.equals(service.collectGroupState(GROUP).state, "Stable"),
- "The group did not initialize as expected.");
-
- executor.shutdown();
-
- TestUtils.waitForCondition(
- () -> Objects.equals(service.collectGroupState(GROUP).state, "Empty"),
- "The group did not become empty as expected.");
-
- cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--group", missingGroup};
-
- ConsumerGroupCommand.ConsumerGroupService service2 = getConsumerGroupService(cgcArgs);
- Map<String, Throwable> result = service2.deleteGroups();
- assertTrue(result.size() == 2 &&
- result.containsKey(GROUP) && result.get(GROUP) == null &&
- result.containsKey(missingGroup) &&
- result.get(missingGroup).getMessage().contains(Errors.GROUP_ID_NOT_FOUND.message()),
- "The consumer group deletion did not work as expected");
+ @Test
+ public void testDeleteWithUnrecognizedNewConsumerOption() {
+ String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", "localhost:62241", "--delete", "--group", getDummyGroupId()};
+ assertThrows(OptionException.class, () -> ConsumerGroupCommandOptions.fromArgs(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testDeleteWithUnrecognizedNewConsumerOption(String quorum) {
- String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP};
- assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs));
+ private String getDummyGroupId() {
+ return composeGroupId(null);
+ }
+
+ private String composeGroupId(GroupProtocol protocol) {
+ String groupPrefix = "test.";
+ return protocol != null ? groupPrefix + protocol.name : groupPrefix + "dummy";
+ }
+
+ private String composeTopicName(GroupProtocol protocol) {
+ String topicPrefix = "foo.";
+ return protocol != null ? topicPrefix + protocol.name : topicPrefix + "dummy";
+ }
+
+ private String composeMissingGroupId(GroupProtocol protocol) {
+ String missingGroupPrefix = "missing.";
+ return protocol != null ? missingGroupPrefix + protocol.name : missingGroupPrefix + "dummy";
+ }
+
+ private AutoCloseable consumerGroupClosable(ClusterInstance cluster, GroupProtocol protocol, String groupId, String topicName) {
+ Map<String, Object> configs = composeConfigs(
+ cluster,
+ groupId,
+ protocol.name,
+ emptyMap());
+
+ return ConsumerGroupCommandTestUtils.buildConsumers(
+ 1,
+ false,
+ topicName,
+ () -> new KafkaConsumer<String, String>(configs)
+ );
+ }
+
+ private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService service, String groupId, ConsumerGroupState state) throws Exception {
+ return Objects.equals(service.collectGroupState(groupId).state, state);
+ }
+
+ private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) {
+ ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args);
+ return new ConsumerGroupCommand.ConsumerGroupService(
+ opts,
+ singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))
+ );
+ }
+
+ private Map<String, Object> composeConfigs(ClusterInstance cluster, String groupId, String groupProtocol, Map<String, Object> customConfigs) {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ configs.put(GROUP_ID_CONFIG, groupId);
+ configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
+ configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
+
+ configs.putAll(customConfigs);
+ return configs;
}
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index fcf86cd..e490393 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.AppInfoParser;
@@ -152,8 +153,8 @@
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
- assertTrue(res.getKey().map(s -> s.contains("Dead")).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
+ assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + group + "'.");
}
@@ -169,12 +170,12 @@
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group};
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
- assertTrue(res.getKey().map(s -> s.contains("Dead")).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
+ assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + group + "'.");
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true);
- assertTrue(res2.getKey().map(s -> s.contains("Dead")).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false),
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(group, true);
+ assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option).");
}
@@ -191,7 +192,7 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
GroupState state = service.collectGroupState(group);
- assertTrue(Objects.equals(state.state, "Dead") && state.numMembers == 0 &&
+ assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 &&
state.coordinator != null && !brokers().filter(s -> s.config().brokerId() == state.coordinator.id()).isEmpty(),
"Expected the state to be 'Dead', with no members in the group '" + group + "'."
);
@@ -285,13 +286,13 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
- Optional<String> state = groupOffsets.getKey();
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
+ Optional<ConsumerGroupState> state = groupOffsets.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = groupOffsets.getValue();
Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
- boolean res = state.map(s -> s.contains("Stable")).orElse(false) &&
+ boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
assignments.isPresent() &&
assignments.get().stream().filter(isGrp).count() == 1;
@@ -321,13 +322,13 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> groupMembers = service.collectGroupMembers(GROUP, false);
- Optional<String> state = groupMembers.getKey();
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> groupMembers = service.collectGroupMembers(GROUP, false);
+ Optional<ConsumerGroupState> state = groupMembers.getKey();
Optional<Collection<MemberAssignmentState>> assignments = groupMembers.getValue();
Predicate<MemberAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
- boolean res = state.map(s -> s.contains("Stable")).orElse(false) &&
+ boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
assignments.isPresent() &&
assignments.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1;
@@ -345,7 +346,7 @@
!Objects.equals(assignmentState.host, ConsumerGroupCommand.MISSING_COLUMN_VALUE);
}, "Expected a 'Stable' group status, rows and valid member information for group " + GROUP + ".");
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
if (res.getValue().isPresent()) {
assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1,
@@ -372,7 +373,7 @@
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(GROUP);
- return Objects.equals(state.state, "Stable") &&
+ return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
state.numMembers == 1 &&
Objects.equals(state.assignmentStrategy, "range") &&
state.coordinator != null &&
@@ -399,7 +400,7 @@
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(GROUP);
- return Objects.equals(state.state, "Stable") &&
+ return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
state.numMembers == 1 &&
Objects.equals(state.assignmentStrategy, expectedName) &&
state.coordinator != null &&
@@ -445,8 +446,8 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
- return res.getKey().map(s -> s.contains("Stable")).orElse(false)
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
&& res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, GROUP) && assignment.offset.isPresent())).orElse(false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
@@ -454,12 +455,12 @@
executor.shutdown();
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(GROUP);
- Optional<String> state = offsets.getKey();
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(GROUP);
+ Optional<ConsumerGroupState> state = offsets.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = offsets.getValue();
List<PartitionAssignmentState> testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, GROUP)).collect(Collectors.toList());
PartitionAssignmentState assignment = testGroupAssignments.get(0);
- return state.map(s -> s.contains("Empty")).orElse(false) &&
+ return state.map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) &&
testGroupAssignments.size() == 1 &&
assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone
assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
@@ -479,8 +480,8 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
- return res.getKey().map(s -> s.contains("Stable")).orElse(false)
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
&& res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, GROUP))).orElse(false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
@@ -488,8 +489,8 @@
executor.shutdown();
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
- return res.getKey().map(s -> s.contains("Empty")).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty();
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty();
}, "Expected no member in describe group members results for group '" + GROUP + "'");
}
@@ -506,7 +507,7 @@
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(GROUP);
- return Objects.equals(state.state, "Stable") &&
+ return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
state.numMembers == 1 &&
state.coordinator != null &&
brokers().count(s -> s.config().brokerId() == state.coordinator.id()) > 0;
@@ -517,7 +518,7 @@
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(GROUP);
- return Objects.equals(state.state, "Empty") && state.numMembers == 0;
+ return Objects.equals(state.state, ConsumerGroupState.EMPTY) && state.numMembers == 0;
}, "Expected the group '" + GROUP + "' to become empty after the only member leaving.");
}
@@ -554,8 +555,8 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
- return res.getKey().map(s -> s.contains("Stable")).isPresent() &&
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).isPresent() &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 1;
@@ -574,8 +575,8 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
- return res.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 1 &&
@@ -583,8 +584,8 @@
res.getValue().get().stream().allMatch(s -> s.assignment.isEmpty());
}, "Expected rows for consumers with no assigned partitions in describe group results");
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
- assertTrue(res.getKey().map(s -> s.contains("Stable")).orElse(false)
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
+ assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
&& res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false),
"Expected additional columns in verbose version of describe members");
}
@@ -602,7 +603,7 @@
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(GROUP);
- return Objects.equals(state.state, "Stable") && state.numMembers == 2;
+ return Objects.equals(state.state, ConsumerGroupState.STABLE) && state.numMembers == 2;
}, "Expected two consumers in describe group results");
}
@@ -643,8 +644,8 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
- return res.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 2 &&
@@ -666,16 +667,16 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
- return res.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, false);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 2 &&
res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0);
}, "Expected two rows (one row per consumer) in describe group members results.");
- Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
- assertTrue(res.getKey().map(s -> s.contains("Stable")).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(GROUP, true);
+ assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
"Expected additional columns in verbose version of describe members");
}
@@ -694,7 +695,7 @@
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(GROUP);
- return Objects.equals(state.state, "Stable") && Objects.equals(state.group, GROUP) && state.numMembers == 2;
+ return Objects.equals(state.state, ConsumerGroupState.STABLE) && Objects.equals(state.group, GROUP) && state.numMembers == 2;
}, "Expected a stable group with two members in describe group state result.");
}
@@ -712,8 +713,8 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
- return res.getKey().map(s -> s.contains("Empty")).orElse(false)
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(GROUP);
+ return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false)
&& res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2;
}, "Expected a stable group with two members in describe group state result.");
}
@@ -812,11 +813,11 @@
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs);
TestUtils.waitForCondition(() -> {
- Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
+ Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(GROUP);
Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, GROUP);
- boolean res = groupOffsets.getKey().map(s -> s.contains("Stable")).orElse(false) &&
+ boolean res = groupOffsets.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
groupOffsets.getValue().isPresent() &&
groupOffsets.getValue().get().stream().filter(isGrp).count() == 1;
diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 4d805ce..e86c61e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -21,6 +21,7 @@
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -501,8 +502,8 @@
private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService consumerGroupService, String group) throws Exception {
TestUtils.waitForCondition(() -> {
- String state = consumerGroupService.collectGroupState(group).state;
- return Objects.equals(state, "Empty") || Objects.equals(state, "Dead");
+ ConsumerGroupState state = consumerGroupService.collectGroupState(group).state;
+ return Objects.equals(state, ConsumerGroupState.EMPTY) || Objects.equals(state, ConsumerGroupState.DEAD);
}, "Expected that consumer group is inactive. Actual state: " + consumerGroupService.collectGroupState(group).state);
}