upgrade kafka server version from 0.11.0.1 to 2.3.1 (#4802)
1. Major jump in kafka docker image tag from 0.11.0.1 to 2.12-2.3.1.
2. Adapt KafkaConnectorTest regex to new format of "started" log message
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 5e6d1b4..7fb49c0 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -149,7 +149,7 @@
protocols:
- TLSv1.2
protocol: "{{ kafka_protocol_for_setup }}"
- version: 0.11.0.1
+ version: 2.12-2.3.1
port: 9072
advertisedPort: 9093
ras:
diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml
index 32f320f..6406990 100644
--- a/ansible/roles/kafka/tasks/deploy.yml
+++ b/ansible/roles/kafka/tasks/deploy.yml
@@ -47,7 +47,10 @@
kafka_non_ssl_vars:
"KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
- "KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
+ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "EXTERNAL:PLAINTEXT"
+ "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_INTER_BROKER_LISTENER_NAME": "EXTERNAL"
- name: add kafka ssl env vars
when: kafka.protocol == 'SSL'
@@ -56,9 +59,9 @@
"KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:SSL"
- "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
- "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
- "KAFKA_PROTOCOL_NAME": "INTERNAL"
+ "KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }},INTERNAL://:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }},INTERNAL://{{ ansible_host }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
+ "KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL"
"KAFKA_SSL_KEYSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
"KAFKA_SSL_KEYSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
"KAFKA_SSL_KEY_PASSWORD": "{{ kafka.ssl.keystore.password }}"
@@ -67,11 +70,6 @@
"KAFKA_SSL_CLIENT_AUTH": "{{ kafka.ssl.client_authentication }}"
"KAFKA_SSL_CIPHER_SUITES": "{{ kafka.ssl.cipher_suites | join(',') }}"
"KAFKA_SSL_ENABLED_PROTOCOLS": "{{ kafka.ssl.protocols | join(',') }}"
- # The sed script passed in CUSTOM_INIT_SCRIPT fixes a bug in the wurstmeister dcoker image
- # by patching the server.configuration file right before kafka is started.
- # The script adds the missing advertized hostname to the advertised.listener property
- # Issue: https://github.com/wurstmeister/kafka-docker/issues/221
- "CUSTOM_INIT_SCRIPT": sed -i \'/^advertised\\.listeners/ s/\\/\\/\\:/\\/\\/{{ ansible_host }}\\:/\' /opt/kafka/config/server.properties
- name: "join kafka ssl env vars"
when: kafka.protocol == 'SSL'
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index c0a4929..6064d56 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -148,7 +148,7 @@
kafkaHosts.indices.foreach { i =>
val message = createMessage()
val kafkaHost = kafkaHosts(i).split(":")(0)
- val startLog = s", started"
+ val startLog = s"\\[KafkaServer id=$i\\] started"
val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
// 1. stop one of kafka node