blob: c2f1fc480c7d81d8ba7de9bebff38045ec630080 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from charmhelpers.core import hookenv
from charmhelpers.core import host
from jujubigdata import utils
from charms.layer.apache_bigtop_base import Bigtop
from charms import layer
from subprocess import check_output
class Kafka(object):
"""
This class manages Kafka.
"""
def __init__(self):
self.dist_config = utils.DistConfig(
data=layer.options('apache-bigtop-base'))
def open_ports(self):
for port in self.dist_config.exposed_ports('kafka'):
hookenv.open_port(port)
def close_ports(self):
for port in self.dist_config.exposed_ports('kafka'):
hookenv.close_port(port)
def configure_kafka(self, zk_units, network_interface=None):
# Get ip:port data from our connected zookeepers
zks = []
for unit in zk_units:
ip = utils.resolve_private_address(unit['host'])
zks.append("%s:%s" % (ip, unit['port']))
zks.sort()
zk_connect = ",".join(zks)
service, unit_num = os.environ['JUJU_UNIT_NAME'].split('/', 1)
kafka_port = self.dist_config.port('kafka')
roles = ['kafka-server']
override = {
'kafka::server::broker_id': unit_num,
'kafka::server::port': kafka_port,
'kafka::server::zookeeper_connection_string': zk_connect,
}
if network_interface:
ip = Bigtop().get_ip_for_interface(network_interface)
override['kafka::server::bind_addr'] = ip
bigtop = Bigtop()
bigtop.render_site_yaml(roles=roles, overrides=override)
bigtop.trigger_puppet()
self.set_advertise()
self.restart()
def restart(self):
self.stop()
self.start()
def start(self):
host.service_start('kafka-server')
def stop(self):
host.service_stop('kafka-server')
def set_advertise(self):
short_host = check_output(['hostname', '-s']).decode('utf8').strip()
# Configure server.properties
# NB: We set the advertised.host.name below to our short hostname
# to kafka (admin will still have to expose kafka and ensure the
# external client can resolve the short hostname to our public ip).
kafka_server_conf = '/etc/kafka/conf/server.properties'
utils.re_edit_in_place(kafka_server_conf, {
r'^#?advertised.host.name=.*': 'advertised.host.name=%s' % short_host,
})