KAFKA-16230: Update verifiable_consumer.py to support KIP-848’s group protocol config (#15328)
The Python VerifiableConsumer now passes in the --group-protocol and --group-remote-assignor command line arguments to VerifiableConsumer if the node is running 3.7.0+ and using the new consumer group.protocol.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 93d9446..e1155c1 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -21,7 +21,7 @@
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_client import VerifiableClientMixin
-from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_0_10_0_0
+from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0
class ConsumerState:
@@ -167,7 +167,7 @@
def __init__(self, context, num_nodes, kafka, topic, group_id,
static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
- assignment_strategy=None,
+ assignment_strategy=None, group_protocol=None, group_remote_assignor=None,
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
"""
@@ -184,6 +184,8 @@
self.session_timeout_sec = session_timeout_sec
self.enable_autocommit = enable_autocommit
self.assignment_strategy = assignment_strategy
+ self.group_protocol = group_protocol
+ self.group_remote_assignor = group_remote_assignor
self.prop_file = ""
self.stop_timeout_sec = stop_timeout_sec
self.on_record_consumed = on_record_consumed
@@ -306,8 +308,20 @@
# if `None` is passed as the argument value
cmd += " --group-instance-id None"
- if self.assignment_strategy:
- cmd += " --assignment-strategy %s" % self.assignment_strategy
+ # 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol.
+ # The two implementations use slightly different configuration, hence these arguments are conditional.
+ #
+ # See the Java class/method VerifiableConsumer.createFromArgs() for how the command line arguments are
+ # parsed and used as configuration in the runner.
+ if node.version >= V_3_7_0 and self.is_consumer_group_protocol_enabled():
+ cmd += " --group-protocol %s" % self.group_protocol
+
+ if self.group_remote_assignor:
+ cmd += " --group-remote-assignor %s" % self.group_remote_assignor
+ else:
+ # Either we're an older consumer version or we're using the old consumer group protocol.
+ if self.assignment_strategy:
+ cmd += " --assignment-strategy %s" % self.assignment_strategy
if self.enable_autocommit:
cmd += " --enable-autocommit "
@@ -416,3 +430,6 @@
with self.lock:
return [handler.node for handler in self.event_handlers.values()
if handler.state != ConsumerState.Dead]
+
+ def is_consumer_group_protocol_enabled(self):
+ return self.group_protocol and self.group_protocol.upper() == "CONSUMER"
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 5fe66a5..afff2ce 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -533,11 +533,20 @@
.action(store())
.required(false)
.type(String.class)
- .setDefault(GroupProtocol.CLASSIC.name)
+ .setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL)
.metavar("GROUP_PROTOCOL")
.dest("groupProtocol")
.help(String.format("Group protocol (must be one of %s)", Utils.join(GroupProtocol.values(), ", ")));
+ parser.addArgument("--group-remote-assignor")
+ .action(store())
+ .required(false)
+ .type(String.class)
+ .setDefault(ConsumerConfig.DEFAULT_GROUP_REMOTE_ASSIGNOR)
+ .metavar("GROUP_REMOTE_ASSIGNOR")
+ .dest("groupRemoteAssignor")
+ .help(String.format("Group remote assignor; only used if the group protocol is %s", GroupProtocol.CONSUMER.name()));
+
parser.addArgument("--group-id")
.action(store())
.required(true)
@@ -599,7 +608,7 @@
.setDefault(RangeAssignor.class.getName())
.type(String.class)
.dest("assignmentStrategy")
- .help("Set assignment strategy (e.g. " + RoundRobinAssignor.class.getName() + ")");
+ .help(String.format("Set assignment strategy (e.g. %s); only used if the group protocol is %s", RoundRobinAssignor.class.getName(), GroupProtocol.CLASSIC.name()));
parser.addArgument("--consumer.config")
.action(store())
@@ -627,7 +636,21 @@
}
}
- consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, res.getString("groupProtocol"));
+ String groupProtocol = res.getString("groupProtocol");
+
+ // 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol.
+ // The two implementations use slightly different configuration, hence these arguments are conditional.
+ //
+ // See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the
+ // command line arguments are passed in by the system test framework.
+ if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
+ consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
+ consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, res.getString("groupRemoteAssignor"));
+ } else {
+ // This means we're using the old consumer group protocol.
+ consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
+ }
+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
String groupInstanceId = res.getString("groupInstanceId");
@@ -650,7 +673,6 @@
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
- consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);