MINOR: Fix failed e2e compatibility_test_new_broker_test and upgrade_test.py (#20471)
#20390 Replace the -`-producer.config` for the verifiable producer and
`--consumer.config` option by `--command-config` for the verifiable
consumer. However, for e2e tests targeting older broker versions, the
original configuration should still be used.
Fix the following tests:
`consumer_protocol_migration_test.py`、`compatibility_test_new_broker_test.py`
and `upgrade_test.py`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 99c403c..9a8a294 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -22,7 +22,7 @@
from kafkatest.services.kafka import TopicPartition, consumer_group
from kafkatest.services.kafka.util import get_log4j_config_param, get_log4j_config_for_tools
from kafkatest.services.verifiable_client import VerifiableClientMixin
-from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0
+from kafkatest.version import get_version, DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0
class ConsumerState:
@@ -424,7 +424,12 @@
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
- cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
+ version = get_version(node)
+ if version.supports_command_config():
+ cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
+ else:
+ cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
+
cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE)
return cmd
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 049de4c..1216842 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -249,7 +249,10 @@
if self.repeating_keys is not None:
cmd += " --repeating-keys %s " % str(self.repeating_keys)
- cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
+ if version.supports_command_config():
+ cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
+ else:
+ cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
return cmd
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 350f833..e4d600a 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -97,6 +97,13 @@
def supports_feature_command(self):
return self >= V_3_8_0
+ def supports_command_config(self):
+ # According to KIP-1147, --producer.config and --consumer.config have been deprecated and will be removed in future versions
+ # For backward compatibility, we select the configuration based on node version:
+ # - For versions 4.2.0 and above, use --command-config
+ # - For older versions, continue using --producer.config or --consumer.config
+ return self >= V_4_2_0
+
def get_version(node=None):
"""Return the version attached to the given node.
Default to DEV_BRANCH if node or node.version is undefined (aka None)
@@ -223,3 +230,7 @@
# 4.1.x version
V_4_1_0 = KafkaVersion("4.1.0")
LATEST_4_1 = V_4_1_0
+
+# 4.2.x version
+V_4_2_0 = KafkaVersion("4.2.0")
+LATEST_4_2 = V_4_2_0
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index c639e34..d1e51a6 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -650,7 +650,7 @@
}
if (commandConfigFile != null) {
try {
- consumerProps.putAll(Utils.loadProps(res.getString(commandConfigFile)));
+ consumerProps.putAll(Utils.loadProps(commandConfigFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}