blob: a5de38bd35095142a6adef48a0d8f9576063ac42 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.
---
# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
- name: "create kafka certificate directory"
file:
path: "{{ config_root_dir }}/kafka/certs"
state: directory
mode: 0777
- name: "copy keystore"
when: kafka.protocol == 'SSL'
copy:
src: "files/{{ kafka.ssl.keystore.name }}"
dest: "{{ config_root_dir }}/kafka/certs"
- name: add kafka default env vars
set_fact:
kafka_env_vars:
"KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
"KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
"KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
"KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
"KAFKA_NUM_NETWORK_THREADS": "{{ kafka.networkThreads }}"
- name: add kafka non-ssl vars
when: kafka.protocol != 'SSL'
set_fact:
kafka_non_ssl_vars:
"KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_ADVERTISED_HOST_NAME": "{{ ansible_host }}"
- name: add kafka ssl env vars
when: kafka.protocol == 'SSL'
set_fact:
kafka_ssl_env_vars:
"KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:SSL"
"KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_PROTOCOL_NAME": "INTERNAL"
"KAFKA_SSL_KEYSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
"KAFKA_SSL_KEYSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
"KAFKA_SSL_KEY_PASSWORD": "{{ kafka.ssl.keystore.password }}"
"KAFKA_SSL_TRUSTSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
"KAFKA_SSL_TRUSTSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
"KAFKA_SSL_CLIENT_AUTH": "{{ kafka.ssl.client_authentication }}"
"KAFKA_SSL_CIPHER_SUITES": "{{ kafka.ssl.cipher_suites | join(',') }}"
# The sed script passed in CUSTOM_INIT_SCRIPT fixes a bug in the wurstmeister dcoker image
# by patching the server.configuration file right before kafka is started.
# The script adds the missing advertized hostname to the advertised.listener property
# Issue: https://github.com/wurstmeister/kafka-docker/issues/221
"CUSTOM_INIT_SCRIPT": sed -i \'/^advertised\\.listeners/ s/\\/\\/\\:/\\/\\/{{ ansible_host }}\\:/\' /opt/kafka/config/server.properties
- name: "join kafka ssl env vars"
when: kafka.protocol == 'SSL'
set_fact:
kafka_env_vars: "{{ kafka_env_vars | combine(kafka_ssl_env_vars) }}"
- name: join kafka non-ssl env vars
when: kafka.protocol != 'SSL'
set_fact:
kafka_env_vars: "{{ kafka_env_vars | combine(kafka_non_ssl_vars) }}"
- name: "(re)start kafka using '{{ kafka_image }}' "
vars:
zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
kafka_image: "{{ kafka.docker_image | default ('wurstmeister/kafka:' ~ kafka.version) }}"
docker_container:
name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
image: "{{ kafka_image }}"
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
env: "{{ kafka_env_vars }}"
ports:
- "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
- "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
volumes:
- "{{ config_root_dir }}/kafka/certs:/config"
pull: true
retries: "{{ docker.pull.retries }}"
delay: "{{ docker.pull.delay }}"
- name: wait until the kafka server started up
shell:
(echo dump; sleep 1) |
nc {{hostvars[groups['zookeepers']|first].ansible_host}} {{zookeeper.port}} |
grep /brokers/ids/{{ groups['kafkas'].index(inventory_hostname) }}
register: result
until: (result.rc == 0)
retries: 10
delay: 5