blob: 6406990ac3dc745fcb3f9d7de88ebfaf4c2340b8 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
---
# This role will install Kafka with Zookeeper in group 'kafka' in the environment inventory
- name: "create kafka certificate directory"
file:
path: "{{ config_root_dir }}/kafka/certs"
state: directory
mode: 0777
- name: "copy keystore"
when: kafka.protocol == 'SSL'
copy:
src: "files/{{ kafka.ssl.keystore.name }}"
dest: "{{ config_root_dir }}/kafka/certs"
- name: add kafka default env vars
set_fact:
kafka_env_vars:
"KAFKA_DEFAULT_REPLICATION_FACTOR": "{{ kafka.replicationFactor }}"
"KAFKA_BROKER_ID": "{{ groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_HEAP_OPTS": "-Xmx{{ kafka.heap }} -Xms{{ kafka.heap }}"
"KAFKA_ZOOKEEPER_CONNECT": "{{ zookeeper_connect_string }}"
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "{{ kafka.offsetsTopicReplicationFactor }}"
"KAFKA_AUTO_CREATE_TOPICS_ENABLE": "false"
"KAFKA_NUM_NETWORK_THREADS": "{{ kafka.networkThreads }}"
"TZ": "{{ docker.timezone }}"
- name: add kafka non-ssl vars
when: kafka.protocol != 'SSL'
set_fact:
kafka_non_ssl_vars:
"KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "EXTERNAL:PLAINTEXT"
"KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_INTER_BROKER_LISTENER_NAME": "EXTERNAL"
- name: add kafka ssl env vars
when: kafka.protocol == 'SSL'
set_fact:
kafka_ssl_env_vars:
"KAFKA_ADVERTISED_PORT": "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_PORT": "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:SSL"
"KAFKA_LISTENERS": "EXTERNAL://:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }},INTERNAL://:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_ADVERTISED_LISTENERS": "EXTERNAL://{{ ansible_host }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }},INTERNAL://{{ ansible_host }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
"KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL"
"KAFKA_SSL_KEYSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
"KAFKA_SSL_KEYSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
"KAFKA_SSL_KEY_PASSWORD": "{{ kafka.ssl.keystore.password }}"
"KAFKA_SSL_TRUSTSTORE_LOCATION": "/config/{{ kafka.ssl.keystore.name }}"
"KAFKA_SSL_TRUSTSTORE_PASSWORD": "{{ kafka.ssl.keystore.password }}"
"KAFKA_SSL_CLIENT_AUTH": "{{ kafka.ssl.client_authentication }}"
"KAFKA_SSL_CIPHER_SUITES": "{{ kafka.ssl.cipher_suites | join(',') }}"
"KAFKA_SSL_ENABLED_PROTOCOLS": "{{ kafka.ssl.protocols | join(',') }}"
- name: "join kafka ssl env vars"
when: kafka.protocol == 'SSL'
set_fact:
kafka_env_vars: "{{ kafka_env_vars | combine(kafka_ssl_env_vars) }}"
- name: join kafka non-ssl env vars
when: kafka.protocol != 'SSL'
set_fact:
kafka_env_vars: "{{ kafka_env_vars | combine(kafka_non_ssl_vars) }}"
- name: "(re)start kafka using '{{ kafka_image }}' "
vars:
zookeeper_idx: "{{ groups['kafkas'].index(inventory_hostname) % (groups['zookeepers'] | length) }}"
kafka_image: "{{ kafka.docker_image | default ('wurstmeister/kafka:' ~ kafka.version) }}"
docker_container:
name: kafka{{ groups['kafkas'].index(inventory_hostname) }}
image: "{{ kafka_image }}"
state: started
recreate: true
restart_policy: "{{ docker.restart.policy }}"
env: "{{ kafka_env_vars }}"
ports:
- "{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.port + groups['kafkas'].index(inventory_hostname) }}"
- "{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}:{{ kafka.advertisedPort + groups['kafkas'].index(inventory_hostname) }}"
volumes:
- "{{ config_root_dir }}/kafka/certs:/config"
pull: "{{ kafka.pull_kafka | default(true) }}"
- name: wait until the kafka server started up
shell:
(echo dump; sleep 1) |
nc {{hostvars[groups['zookeepers']|first].ansible_host}} {{zookeeper.port}} |
grep /brokers/ids/{{ groups['kafkas'].index(inventory_hostname) }}
register: result
until: (result.rc == 0)
retries: 10
delay: 5