upgrade kafka server version from 0.11.0.1 to 2.3.1 (#4802)

1. Major jump in kafka docker image tag from 0.11.0.1 to 2.12-2.3.1.
2. Adapt KafkaConnectorTest regex to new format of "started" log message

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 5e6d1b4..7fb49c0 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -149,7 +149,7 @@
     protocols:
     - TLSv1.2
   protocol: "{{ kafka_protocol_for_setup }}"
-  version: 0.11.0.1
+  version: 2.12-2.3.1
   port: 9072
   advertisedPort: 9093
   ras:
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index 32f320f..6406990 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -47,7 +47,10 @@
     kafka_non_ssl_vars:
       "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
       "KAFKA_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
-      "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+      "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "EXTERNAL:PLAINTEXT"
+      "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_INTER_BROKER_LISTENER_NAME": "EXTERNAL"
 
 - name: add kafka ssl env vars
   when: kafka.protocol == 'SSL'
@@ -56,9 +59,9 @@
       "KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
       "KAFKA_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
       "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:SSL"
-      "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
-      "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
-      "KAFKA_PROTOCOL_NAME": "INTERNAL"
+      "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }},INTERNAL://:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }},INTERNAL://{{ ansible_host }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+      "KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL"
       "KAFKA_SSL_KEYSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
       "KAFKA_SSL_KEYSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
       "KAFKA_SSL_KEY_PASSWORD": "{{ kafka.ssl.keystore.password }}"
@@ -67,11 +70,6 @@
       "KAFKA_SSL_CLIENT_AUTH": "{{ kafka.ssl.client_authentication }}"
       "KAFKA_SSL_CIPHER_SUITES": "{{ kafka.ssl.cipher_suites | join(',') }}"
       "KAFKA_SSL_ENABLED_PROTOCOLS": "{{ kafka.ssl.protocols | join(',') }}"
-    # The sed script passed in CUSTOM_INIT_SCRIPT fixes a bug in the wurstmeister dcoker image
-    # by patching the server.configuration file right before kafka is started.
-    # The script adds the missing advertized hostname to the advertised.listener property
-    # Issue: https://github.com/wurstmeister/kafka-docker/issues/221
-      "CUSTOM_INIT_SCRIPT": sed -i \'/^advertised\\.listeners/ s/\\/\\/\\:/\\/\\/{{ ansible_host }}\\:/\' /opt/kafka/config/server.properties
 
 - name: "join kafka ssl env vars"
   when: kafka.protocol == 'SSL'
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index c0a4929..6064d56 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -148,7 +148,7 @@
       kafkaHosts.indices.foreach { i =>
         val message = createMessage()
         val kafkaHost = kafkaHosts(i).split(":")(0)
-        val startLog = s", started"
+        val startLog = s"\\[KafkaServer id=$i\\] started"
         val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
 
         // 1. stop one of kafka node