blob: 28a56b152d77af4a1b1bce804e6139f11b73fcbd [file] [log] [blame]
#!/usr/bin/env python
#################################################################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#################################################################################################################################
import sys
import paramiko
import os.path
import random
import requests
import logging
from time import sleep
from optparse import OptionParser
#################################################################################################################################
# A "chaos monkey"-like script that periodically kills parts of Samza, including YARN (RM, NM), Kafka, and Samza (AM, container)
# This script depends on paramiko, an ssh library
#################################################################################################################################
# Create an ssh connection to the given host
def connect(host):
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(host)
return client
# Run the given command line using the ssh connection (throw an error if anything on stderr)
def execute(conn, cmd):
logging.info("Executing command {0}".format(cmd))
stdin, stdout, stderr = conn.exec_command(cmd)
output = stdout.read()
err = stderr.read()
if output:
logging.info(output)
return output
if err:
logging.error(err)
raise Exception("Error executing command: %s" %err)
# Unfortunately pkill on mac seems to have a length limit which prevents it working with out epic java command line arguments
def pkill(pid):
return "kill {0}".format(pid)
def get_pid(pattern):
return "ps aux | grep " + pattern + " | grep -v 'grep' | awk -F' *' '{print $2}'"
# Kill the kafka broker
def kill_kafka(options):
connection = connect(options.kafka_host)
logging.info("Killing Kafka Broker ...")
kafka_pid = execute(connection, get_pid("kafka.Kafka"))
if kafka_pid is not None:
execute(connection, pkill(kafka_pid))
sleep(20)
logging.info("Restarting Kafka Broker...")
execute(connection, "nohup " + options.kafka_dir + "/bin/kafka-server-start.sh " + options.kafka_dir + "/config/server.properties > " + options.kafka_dir + "/logs/kafka.log 2>&1 &")
connection.close()
else:
logging.info("Could not determine Kafka broker process. Not doing anything")
def query_yarn(host, port, query):
return requests.get("http://{0}:{1}/{2}".format(host, port, query))
def get_app_ids_running(host, port):
logging.info("Querying RM for RUNNING application information")
list_apps_command = "ws/v1/cluster/apps?status=RUNNING"
response = query_yarn(host, port, list_apps_command).json()
if len(response) == 0 or not response['apps']:
raise Exception("Got an empty apps response back. Can't run kill script without Samza jobs running.")
apps = reduce(list.__add__, map(lambda x: list(x), response['apps'].values()))
appInfo = []
for app in apps:
appInfo.append((app['id'], app['name']))
return appInfo
# Kill the samza container instances
def kill_containers(hosts, app_id):
for host in hosts:
connection = connect(host)
pid = execute(connection, get_pid("samza-container-0 | grep {0}".format(app_id)))
if pid:
logging.info("Killing samza container on {0} with pid {1}".format(host, pid))
execute(connection, pkill(pid))
connection.close()
if pid is None:
logging.info("Couldn't find any container on the list of hosts. Nothing to kill :(")
def require_arg(options, name):
if not hasattr(options, name) or getattr(options, name) is None:
print >> sys.stderr, "Missing required property:", name
sys.exit(1)
# Command line options
parser = OptionParser()
parser.add_option("--node-list", dest="filename", help="A list of nodes in the YARN cluster", metavar="nodes.txt")
parser.add_option("--kill-time", dest="kill_time", help="The time in seconds to sleep between", metavar="s", default=90)
parser.add_option("--kafka-dir", dest="kafka_dir", help="The directory in which to find kafka", metavar="dir")
parser.add_option("--kafka-host", dest="kafka_host", help="Host on which Kafka is installed", metavar="localhost")
parser.add_option("--yarn-dir", dest="yarn_dir", help="The directory in which to find yarn", metavar="dir")
parser.add_option("--kill-kafka", action="store_true", dest="kill_kafka", default=False, help="Should we kill Kafka?")
parser.add_option("--kill-container", action="store_true", dest="kill_container", default=False, help="Should we kill Application Container?")
parser.add_option("--yarn-host", dest="yarn_host", help="Host that will respond to Yarn REST queries ", metavar="localhost")
(options, args) = parser.parse_args()
kill_script_log_path = '/tmp/samza-kill-log.log'
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S', filename=kill_script_log_path, level=logging.INFO)
console = logging.StreamHandler()
formatter = logging.Formatter(fmt='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
components = []
app_info = {}
if options.kill_container:
require_arg(options, 'filename')
require_arg(options, 'yarn_host')
require_arg(options, 'yarn_dir')
components.append("container")
hosts = [line.strip() for line in open(options.filename).readlines()]
if len(hosts) < 1:
print >> sys.stderr, "No hosts in host file."
sys.exit(1)
app_info = get_app_ids_running(options.yarn_host, 8088)
if options.kill_kafka:
require_arg(options, 'kafka_host')
require_arg(options, 'kafka_dir')
components.append("kafka")
if len(components) == 0:
parser.print_help()
else:
while True:
kill_time = int(options.kill_time)
component_id = 0
if len(components) > 1:
component_id = random.randint(0, len(components) - 1)
kill_component = components[component_id]
if kill_component == "kafka":
logging.info("Choosing Kafka broker on {0}".format(options.kafka_host))
kill_kafka(options)
logging.info("Sleeping for {0}".format(kill_time * 2))
sleep(kill_time * 2)
elif kill_component == "container":
app_id_to_kill = random.randint(0, len(app_info) - 1)
logging.info("Choosing a Samza Container for {0}".format(app_info[app_id_to_kill]))
kill_containers(hosts, app_info[app_id_to_kill][0])
logging.info("Sleeping for {0}".format(kill_time))
sleep(kill_time)