Group id is mandatory configurtion option for confluent_kafka 2.4.0+ (#39559)
The group.id parameter has been optional before 2.4.0 version of
confluent_kafka relased on 7th of May. This started to fail our
integration tests (cool).
This PR adds "group.id" as extra field and sets the defaults for
integration testing.
diff --git a/airflow/providers/apache/kafka/hooks/base.py b/airflow/providers/apache/kafka/hooks/base.py
index 189c4cd..2f99cb2 100644
--- a/airflow/providers/apache/kafka/hooks/base.py
+++ b/airflow/providers/apache/kafka/hooks/base.py
@@ -49,7 +49,7 @@
"hidden_fields": ["schema", "login", "password", "port", "host"],
"relabeling": {"extra": "Config Dict"},
"placeholders": {
- "extra": '{"bootstrap.servers": "localhost:9092"}',
+ "extra": '{"bootstrap.servers": "localhost:9092", "group.id": "my-group"}',
},
}
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index b5a5c46..f25594e 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -424,7 +424,7 @@
Connection(
conn_id="kafka_default",
conn_type="kafka",
- extra=json.dumps({"bootstrap.servers": "broker:29092"}),
+ extra=json.dumps({"bootstrap.servers": "broker:29092", "group.id": "my-group"}),
),
session,
)
diff --git a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py
index 1200a96..9597456 100644
--- a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py
+++ b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py
@@ -25,7 +25,7 @@
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
from airflow.utils import db
-client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092"}
+client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092", "group.id": "my-group"}
@pytest.mark.integration("kafka")
diff --git a/tests/integration/providers/apache/kafka/operators/test_produce.py b/tests/integration/providers/apache/kafka/operators/test_produce.py
index be76fe5..f2fa935 100644
--- a/tests/integration/providers/apache/kafka/operators/test_produce.py
+++ b/tests/integration/providers/apache/kafka/operators/test_produce.py
@@ -41,6 +41,7 @@
"""
def setup_method(self):
+ GROUP = "operator.producer.test.integration.test_1"
db.merge_conn(
Connection(
conn_id="kafka_default",
@@ -50,6 +51,7 @@
"socket.timeout.ms": 10,
"message.timeout.ms": 10,
"bootstrap.servers": "broker:29092",
+ "group.id": GROUP,
}
),
)