Group id is mandatory configurtion option for confluent_kafka 2.4.0+ (#39559)

The group.id parameter has been optional before 2.4.0 version of
confluent_kafka relased on 7th of May. This started to fail our
integration tests (cool).

This PR adds "group.id" as extra field and sets the defaults for
integration testing.
diff --git a/airflow/providers/apache/kafka/hooks/base.py b/airflow/providers/apache/kafka/hooks/base.py
index 189c4cd..2f99cb2 100644
--- a/airflow/providers/apache/kafka/hooks/base.py
+++ b/airflow/providers/apache/kafka/hooks/base.py
@@ -49,7 +49,7 @@
             "hidden_fields": ["schema", "login", "password", "port", "host"],
             "relabeling": {"extra": "Config Dict"},
             "placeholders": {
-                "extra": '{"bootstrap.servers": "localhost:9092"}',
+                "extra": '{"bootstrap.servers": "localhost:9092", "group.id": "my-group"}',
             },
         }
 
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index b5a5c46..f25594e 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -424,7 +424,7 @@
         Connection(
             conn_id="kafka_default",
             conn_type="kafka",
-            extra=json.dumps({"bootstrap.servers": "broker:29092"}),
+            extra=json.dumps({"bootstrap.servers": "broker:29092", "group.id": "my-group"}),
         ),
         session,
     )
diff --git a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py
index 1200a96..9597456 100644
--- a/tests/integration/providers/apache/kafka/hooks/test_admin_client.py
+++ b/tests/integration/providers/apache/kafka/hooks/test_admin_client.py
@@ -25,7 +25,7 @@
 from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
 from airflow.utils import db
 
-client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092"}
+client_config = {"socket.timeout.ms": 1000, "bootstrap.servers": "broker:29092", "group.id": "my-group"}
 
 
 @pytest.mark.integration("kafka")
diff --git a/tests/integration/providers/apache/kafka/operators/test_produce.py b/tests/integration/providers/apache/kafka/operators/test_produce.py
index be76fe5..f2fa935 100644
--- a/tests/integration/providers/apache/kafka/operators/test_produce.py
+++ b/tests/integration/providers/apache/kafka/operators/test_produce.py
@@ -41,6 +41,7 @@
     """
 
     def setup_method(self):
+        GROUP = "operator.producer.test.integration.test_1"
         db.merge_conn(
             Connection(
                 conn_id="kafka_default",
@@ -50,6 +51,7 @@
                         "socket.timeout.ms": 10,
                         "message.timeout.ms": 10,
                         "bootstrap.servers": "broker:29092",
+                        "group.id": GROUP,
                     }
                 ),
             )