blob: 325c65c0d0c7e821d4181c453bca6a748f08d615 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import os
import inspect
import pprint
import util
from resource_management import *
logger = logging.getLogger()
class Kafka(Script):
def install(self, env):
self.install_packages(env)
def configure(self, env):
import params
env.set_params(params)
def start(self, env):
import params
env.set_params(params)
self.configure(env)
# log the component configuration
ppp = pprint.PrettyPrinter(indent=4)
logger.info("broker component config: " + ppp.pformat(params.broker_config))
# log the environment variables
logger.info("Env Variables:")
for key in os.environ.keys():
logger.info("%10s %s \n" % (key,os.environ[key]))
pass
# This updating thing is changing files in-place and it really
# should not (static cache)
# For kafka 0.8.1.1, there is no way to set the log dir to location other than params.app_root + "/logs"
if(params.kafka_version.find("0.8.1.1") != -1):
os.symlink(params.app_root + "/logs", params.app_log_dir + "/kafka")
else:
kafkaLogConfig = {"kafka.logs.dir" : params.app_log_dir + "/kafka"}
util.updating(params.app_root + "/config/log4j.properties", kafkaLogConfig)
# File(format("{params.app_root}/conf/log4j.properties"),
# owner=params.app_user,
# content=InlineTemplate(param.log4j_prop))
pass
# update the broker properties for different brokers
server_conf=format("{params.conf_dir}/server.slider.properties")
PropertiesFile(server_conf, properties = params.broker_config, owner=params.app_user)
# execute the process
process_cmd = format("{app_root}/bin/kafka-server-start.sh {server_conf}")
os.environ['LOG_DIR'] = params.app_log_dir + "/kafka"
HEAP_OPT = ""
if params.xmx:
HEAP_OPT = HEAP_OPT + " -Xmx" + params.xmx
pass
if params.xms:
HEAP_OPT = HEAP_OPT + " -Xms" + params.xms
pass
if HEAP_OPT:
os.environ['KAFKA_HEAP_OPTS'] = HEAP_OPT
pass
Execute(process_cmd,
user=params.app_user,
logoutput=True,
wait_for_finish=False,
pid_file=params.pid_file
)
def stop(self, env):
import params
env.set_params(params)
pid = format("`cat {pid_file}` >/dev/null 2>&1")
Execute(format("kill {pid}"),
user=params.app_user
)
Execute(format("kill -9 {pid}"),
ignore_failures=True,
user=params.app_user
)
Execute(format("rm -f {pid_file}"),
user=params.app_user)
def status(self, env):
import status_params
env.set_params(status_params)
## jps_cmd = format("{java64_home}/bin/jps")
no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1")
## cmd = format("echo `{jps_cmd} | grep Kafka | cut -d' ' -f1` > {pid_file}")
## Execute(cmd, not_if=no_op_test)
check_process_status(status_params.pid_file)
if __name__ == "__main__":
Kafka().execute()