blob: 6caf196c592d4929ff1ff2f3c4ec5170864a2db0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' standalone.py '''
from __future__ import print_function
from collections import OrderedDict
from subprocess import call
import subprocess
import sys
import os
import tempfile
import tarfile
import argparse
import socket
import requests
import time
import netifaces
import yaml
import json
from heron.common.src.python.utils.log import Log
from heron.tools.cli.src.python.result import SimpleResult, Status
import heron.tools.cli.src.python.args as cli_args
import heron.tools.common.src.python.utils.config as config
# pylint: disable=anomalous-backslash-in-string
# pylint: disable=unused-argument
# pylint: disable=too-many-branches
class Action(object):
SET = "set"
CLUSTER = "cluster"
TEMPLATE = "template"
GET = "get"
INFO = "info"
TYPE = "type"
class Role(object):
ZOOKEEPERS = "zookeepers"
MASTERS = "masters"
SLAVES = "slaves"
CLUSTER = "cluster"
class Cluster(object):
START = "start"
STOP = "stop"
class Get(object):
SERVICE_URL = "service-url"
HERON_TRACKER_URL = "heron-tracker-url"
HERON_UI_URL = "heron-ui-url"
################################################################################
def create_parser(subparsers):
'''
Create a subparser for the standalone command
:param subparsers:
:return:
'''
parser = subparsers.add_parser(
'standalone',
help='Start a standalone Heron cluster',
add_help=True
)
cli_args.add_titles(parser)
parser_action = parser.add_subparsers()
parser_cluster = parser_action.add_parser(
Action.CLUSTER,
help='Start or stop cluster',
add_help=True,
formatter_class=argparse.RawTextHelpFormatter,
)
parser_cluster.set_defaults(action=Action.CLUSTER)
parser_set = parser_action.add_parser(
Action.SET,
help='Set configurations for standalone cluster e.g. master or slave nodes',
add_help=True,
formatter_class=argparse.RawTextHelpFormatter
)
parser_set.set_defaults(action=Action.SET)
parser_template = parser_action.add_parser(
Action.TEMPLATE,
help='Template Heron configurations based on cluster roles',
add_help=True,
formatter_class=argparse.RawTextHelpFormatter
)
parser_template.set_defaults(action=Action.TEMPLATE)
parser_cluster.add_argument(
TYPE,
type=str,
choices={Cluster.START, Cluster.STOP},
help= \
"""
Choices supports the following:
start - Start standalone Heron cluster
stop - Stop standalone Heron cluster
"""
)
parser_template.add_argument(
TYPE,
type=str,
choices={"configs"},
)
parser_get = parser_action.add_parser(
Action.GET,
help='Get attributes about the standalone cluster',
add_help=True,
formatter_class=argparse.RawTextHelpFormatter
)
parser_get.set_defaults(action=Action.GET)
parser_get.add_argument(
TYPE,
type=str,
choices={Get.SERVICE_URL, Get.HERON_TRACKER_URL, Get.HERON_UI_URL},
help= \
"""
Choices supports the following:
service-url - Get the service url for standalone cluster
heron-tracker-url - Get the url for the heron tracker in standalone cluster
heron-ui-url - Get the url for the heron ui standalone cluster
"""
)
parser_info = parser_action.add_parser(
Action.INFO,
help='Get general information about the standalone cluster',
add_help=True,
formatter_class=argparse.RawTextHelpFormatter
)
parser_info.set_defaults(action=Action.INFO)
add_additional_args([parser_set, parser_cluster, parser_template, parser_get, parser_info])
parser.set_defaults(subcommand='standalone')
return parser
################################################################################
def run(command, parser, cl_args, unknown_args):
'''
runs parser
'''
action = cl_args["action"]
if action == Action.SET:
call_editor(get_inventory_file(cl_args))
update_config_files(cl_args)
elif action == Action.CLUSTER:
action_type = cl_args["type"]
if action_type == Cluster.START:
start_cluster(cl_args)
elif action_type == Cluster.STOP:
if check_sure(cl_args, "Are you sure you want to stop the cluster?"
" This will terminate everything running in "
"the cluster and remove any scheduler state."):
stop_cluster(cl_args)
else:
raise ValueError("Invalid cluster action %s" % action_type)
elif action == Action.TEMPLATE:
update_config_files(cl_args)
elif action == Action.GET:
action_type = cl_args["type"]
if action_type == Get.SERVICE_URL:
print(get_service_url(cl_args))
elif action_type == Get.HERON_UI_URL:
print(get_heron_ui_url(cl_args))
elif action_type == Get.HERON_TRACKER_URL:
print(get_heron_tracker_url(cl_args))
else:
raise ValueError("Invalid get action %s" % action_type)
elif action == Action.INFO:
print_cluster_info(cl_args)
else:
raise ValueError("Invalid action %s" % action)
return SimpleResult(Status.Ok)
################################################################################
def update_config_files(cl_args):
Log.info("Updating config files...")
roles = read_and_parse_roles(cl_args)
Log.debug("roles: %s" % roles)
masters = list(roles[Role.MASTERS])
zookeepers = list(roles[Role.ZOOKEEPERS])
template_slave_hcl(cl_args, masters)
template_scheduler_yaml(cl_args, masters)
template_uploader_yaml(cl_args, masters)
template_apiserver_hcl(cl_args, masters, zookeepers)
template_statemgr_yaml(cl_args, zookeepers)
template_heron_tools_hcl(cl_args, masters, zookeepers)
##################### Templating functions ######################################
def template_slave_hcl(cl_args, masters):
'''
Template slave config file
'''
slave_config_template = "%s/standalone/templates/slave.template.hcl" % cl_args["config_path"]
slave_config_actual = "%s/standalone/resources/slave.hcl" % cl_args["config_path"]
masters_in_quotes = ['"%s"' % master for master in masters]
template_file(slave_config_template, slave_config_actual,
{"<nomad_masters:master_port>": ", ".join(masters_in_quotes)})
def template_scheduler_yaml(cl_args, masters):
'''
Template scheduler.yaml
'''
single_master = masters[0]
scheduler_config_actual = "%s/standalone/scheduler.yaml" % cl_args["config_path"]
scheduler_config_template = "%s/standalone/templates/scheduler.template.yaml" \
% cl_args["config_path"]
template_file(scheduler_config_template, scheduler_config_actual,
{"<scheduler_uri>": "http://%s:4646" % single_master})
def template_uploader_yaml(cl_args, masters):
'''
Tempate uploader.yaml
'''
single_master = masters[0]
uploader_config_template = "%s/standalone/templates/uploader.template.yaml" \
% cl_args["config_path"]
uploader_config_actual = "%s/standalone/uploader.yaml" % cl_args["config_path"]
template_file(uploader_config_template, uploader_config_actual,
{"<http_uploader_uri>": "http://%s:9000/api/v1/file/upload" % single_master})
def template_apiserver_hcl(cl_args, masters, zookeepers):
"""
template apiserver.hcl
"""
single_master = masters[0]
apiserver_config_template = "%s/standalone/templates/apiserver.template.hcl" \
% cl_args["config_path"]
apiserver_config_actual = "%s/standalone/resources/apiserver.hcl" % cl_args["config_path"]
replacements = {
"<heron_apiserver_hostname>": '"%s"' % get_hostname(single_master, cl_args),
"<heron_apiserver_executable>": '"%s/heron-apiserver"'
% config.get_heron_bin_dir()
if is_self(single_master)
else '"%s/.heron/bin/heron-apiserver"'
% get_remote_home(single_master, cl_args),
"<zookeeper_host:zookeeper_port>": ",".join(
['%s' % zk if ":" in zk else '%s:2181' % zk for zk in zookeepers]),
"<scheduler_uri>": "http://%s:4646" % single_master
}
template_file(apiserver_config_template, apiserver_config_actual, replacements)
def template_statemgr_yaml(cl_args, zookeepers):
'''
Template statemgr.yaml
'''
statemgr_config_file_template = "%s/standalone/templates/statemgr.template.yaml" \
% cl_args["config_path"]
statemgr_config_file_actual = "%s/standalone/statemgr.yaml" % cl_args["config_path"]
template_file(statemgr_config_file_template, statemgr_config_file_actual,
{"<zookeeper_host:zookeeper_port>": ",".join(
['"%s"' % zk if ":" in zk else '"%s:2181"' % zk for zk in zookeepers])})
def template_heron_tools_hcl(cl_args, masters, zookeepers):
'''
template heron tools
'''
heron_tools_hcl_template = "%s/standalone/templates/heron_tools.template.hcl" \
% cl_args["config_path"]
heron_tools_hcl_actual = "%s/standalone/resources/heron_tools.hcl" \
% cl_args["config_path"]
single_master = masters[0]
template_file(heron_tools_hcl_template, heron_tools_hcl_actual,
{
"<zookeeper_host:zookeeper_port>": ",".join(
['%s' % zk if ":" in zk else '%s:2181' % zk for zk in zookeepers]),
"<heron_tracker_executable>": '"%s/heron-tracker"' % config.get_heron_bin_dir(),
"<heron_tools_hostname>": '"%s"' % get_hostname(single_master, cl_args),
"<heron_ui_executable>": '"%s/heron-ui"' % config.get_heron_bin_dir()
})
def template_file(src, dest, replacements_dict):
Log.debug("Templating %s - > %s with %s" % (src, dest, replacements_dict))
file_contents = ""
with open(src, 'r') as tf:
file_contents = tf.read()
for key, value in replacements_dict.items():
file_contents = file_contents.replace(key, value)
if not file_contents:
Log.error("File contents after templating is empty")
sys.exit(-1)
with open(dest, 'w') as tf:
tf.write(file_contents)
tf.truncate()
################################################################################
def get_service_url(cl_args):
'''
get service url for standalone cluster
'''
roles = read_and_parse_roles(cl_args)
return "http://%s:9000" % list(roles[Role.MASTERS])[0]
def get_heron_tracker_url(cl_args):
'''
get service url for standalone cluster
'''
roles = read_and_parse_roles(cl_args)
return "http://%s:8888" % list(roles[Role.MASTERS])[0]
def get_heron_ui_url(cl_args):
'''
get service url for standalone cluster
'''
roles = read_and_parse_roles(cl_args)
return "http://%s:8889" % list(roles[Role.MASTERS])[0]
def print_cluster_info(cl_args):
'''
get cluster info for standalone cluster
'''
parsed_roles = read_and_parse_roles(cl_args)
masters = list(parsed_roles[Role.MASTERS])
slaves = list(parsed_roles[Role.SLAVES])
zookeepers = list(parsed_roles[Role.ZOOKEEPERS])
cluster = list(parsed_roles[Role.CLUSTER])
# OrderedDicts are used here so that the key order can be
# specified directly
info = OrderedDict()
info['numNodes'] = len(cluster)
info['nodes'] = cluster
roles = OrderedDict()
roles['masters'] = masters
roles['slaves'] = slaves
roles['zookeepers'] = zookeepers
urls = OrderedDict()
urls['serviceUrl'] = get_service_url(cl_args)
urls['heronUi'] = get_heron_ui_url(cl_args)
urls['heronTracker'] = get_heron_tracker_url(cl_args)
info['roles'] = roles
info['urls'] = urls
print(json.dumps(info, indent=2))
def add_additional_args(parsers):
'''
add additional parameters to parser
'''
for parser in parsers:
cli_args.add_verbose(parser)
cli_args.add_config(parser)
parser.add_argument(
'--heron-dir',
default=config.get_heron_dir(),
help='Path to Heron home directory')
def stop_cluster(cl_args):
'''
teardown the cluster
'''
Log.info("Terminating cluster...")
roles = read_and_parse_roles(cl_args)
masters = roles[Role.MASTERS]
slaves = roles[Role.SLAVES]
dist_nodes = masters.union(slaves)
# stop all jobs
if masters:
try:
single_master = list(masters)[0]
jobs = get_jobs(cl_args, single_master)
for job in jobs:
job_id = job["ID"]
Log.info("Terminating job %s" % job_id)
delete_job(cl_args, job_id, single_master)
except:
Log.debug("Error stopping jobs")
Log.debug(sys.exc_info()[0])
for node in dist_nodes:
Log.info("Terminating processes on %s" % node)
if not is_self(node):
cmd = "ps aux | grep heron-nomad | awk '{print \$2}' " \
"| xargs kill"
cmd = ssh_remote_execute(cmd, node, cl_args)
else:
cmd = "ps aux | grep heron-nomad | awk '{print $2}' " \
"| xargs kill"
Log.debug(cmd)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return_code = pid.wait()
output = pid.communicate()
Log.debug("return code: %s output: %s" % (return_code, output))
Log.info("Cleaning up directories on %s" % node)
cmd = "rm -rf /tmp/slave ; rm -rf /tmp/master"
if not is_self(node):
cmd = ssh_remote_execute(cmd, node, cl_args)
Log.debug(cmd)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return_code = pid.wait()
output = pid.communicate()
Log.debug("return code: %s output: %s" % (return_code, output))
def start_cluster(cl_args):
'''
Start a Heron standalone cluster
'''
roles = read_and_parse_roles(cl_args)
masters = roles[Role.MASTERS]
slaves = roles[Role.SLAVES]
zookeepers = roles[Role.ZOOKEEPERS]
Log.info("Roles:")
Log.info(" - Master Servers: %s" % list(masters))
Log.info(" - Slave Servers: %s" % list(slaves))
Log.info(" - Zookeeper Servers: %s" % list(zookeepers))
if not masters:
Log.error("No master servers specified!")
sys.exit(-1)
if not slaves:
Log.error("No slave servers specified!")
sys.exit(-1)
if not zookeepers:
Log.error("No zookeeper servers specified!")
sys.exit(-1)
# make sure configs are templated
update_config_files(cl_args)
dist_nodes = list(masters.union(slaves))
# if just local deployment
if not (len(dist_nodes) == 1 and is_self(dist_nodes[0])):
distribute_package(roles, cl_args)
start_master_nodes(masters, cl_args)
start_slave_nodes(slaves, cl_args)
start_api_server(masters, cl_args)
start_heron_tools(masters, cl_args)
Log.info("Heron standalone cluster complete!")
def start_api_server(masters, cl_args):
'''
Start the Heron API server
'''
# make sure nomad cluster is up
single_master = list(masters)[0]
wait_for_master_to_start(single_master)
cmd = "%s run %s >> /tmp/apiserver_start.log 2>&1 &" \
% (get_nomad_path(cl_args), get_apiserver_job_file(cl_args))
Log.info("Starting Heron API Server on %s" % single_master)
if not is_self(single_master):
cmd = ssh_remote_execute(cmd, single_master, cl_args)
Log.debug(cmd)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return_code = pid.wait()
output = pid.communicate()
Log.debug("return code: %s output: %s" % (return_code, output))
if return_code != 0:
Log.error("Failed to start API server on %s with error:\n%s" % (single_master, output[1]))
sys.exit(-1)
wait_for_job_to_start(single_master, "apiserver")
Log.info("Done starting Heron API Server")
def start_heron_tools(masters, cl_args):
'''
Start Heron tracker and UI
'''
single_master = list(masters)[0]
wait_for_master_to_start(single_master)
cmd = "%s run %s >> /tmp/heron_tools_start.log 2>&1 &" \
% (get_nomad_path(cl_args), get_heron_tools_job_file(cl_args))
Log.info("Starting Heron Tools on %s" % single_master)
if not is_self(single_master):
cmd = ssh_remote_execute(cmd, single_master, cl_args)
Log.debug(cmd)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return_code = pid.wait()
output = pid.communicate()
Log.debug("return code: %s output: %s" % (return_code, output))
if return_code != 0:
Log.error("Failed to start Heron Tools on %s with error:\n%s" % (single_master, output[1]))
sys.exit(-1)
wait_for_job_to_start(single_master, "heron-tools")
Log.info("Done starting Heron Tools")
def distribute_package(roles, cl_args):
'''
distribute Heron packages to all nodes
'''
Log.info("Distributing heron package to nodes (this might take a while)...")
masters = roles[Role.MASTERS]
slaves = roles[Role.SLAVES]
tar_file = tempfile.NamedTemporaryFile(suffix=".tmp").name
Log.debug("TAR file %s to %s" % (cl_args["heron_dir"], tar_file))
make_tarfile(tar_file, cl_args["heron_dir"])
dist_nodes = masters.union(slaves)
scp_package(tar_file, dist_nodes, cl_args)
def wait_for_master_to_start(single_master):
'''
Wait for a nomad master to start
'''
i = 0
while True:
try:
r = requests.get("http://%s:4646/v1/status/leader" % single_master)
if r.status_code == 200:
break
except:
Log.debug(sys.exc_info()[0])
Log.info("Waiting for cluster to come up... %s" % i)
time.sleep(1)
if i > 10:
Log.error("Failed to start Nomad Cluster!")
sys.exit(-1)
i = i + 1
def wait_for_job_to_start(single_master, job):
'''
Wait for a Nomad job to start
'''
i = 0
while True:
try:
r = requests.get("http://%s:4646/v1/job/%s" % (single_master, job))
if r.status_code == 200 and r.json()["Status"] == "running":
break
else:
raise RuntimeError()
except:
Log.debug(sys.exc_info()[0])
Log.info("Waiting for %s to come up... %s" % (job, i))
time.sleep(1)
if i > 20:
Log.error("Failed to start Nomad Cluster!")
sys.exit(-1)
i = i + 1
def scp_package(package_file, destinations, cl_args):
'''
scp and extract package
'''
pids = []
for dest in destinations:
if is_self(dest):
continue
Log.info("Server: %s" % dest)
file_path = "/tmp/heron.tar.gz"
dest_file_path = "%s:%s" % (dest, file_path)
remote_cmd = "rm -rf ~/.heron && mkdir ~/.heron " \
"&& tar -xzvf %s -C ~/.heron --strip-components 1" % (file_path)
cmd = '%s && %s' \
% (scp_cmd(package_file, dest_file_path, cl_args),
ssh_remote_execute(remote_cmd, dest, cl_args))
Log.debug(cmd)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
pids.append({"pid": pid, "dest": dest})
errors = []
for entry in pids:
pid = entry["pid"]
return_code = pid.wait()
output = pid.communicate()
Log.debug("return code: %s output: %s" % (return_code, output))
if return_code != 0:
errors.append("Failed to scp package to %s with error:\n%s" % (entry["dest"], output[1]))
if errors:
for error in errors:
Log.error(error)
sys.exit(-1)
Log.info("Done distributing packages")
def make_tarfile(output_filename, source_dir):
'''
Tar a directory
'''
with tarfile.open(output_filename, "w:gz") as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
def start_master_nodes(masters, cl_args):
'''
Start master nodes
'''
pids = []
for master in masters:
Log.info("Starting master on %s" % master)
cmd = "%s agent -config %s >> /tmp/nomad_server_log 2>&1 &" \
% (get_nomad_path(cl_args), get_nomad_master_config_file(cl_args))
if not is_self(master):
cmd = ssh_remote_execute(cmd, master, cl_args)
Log.debug(cmd)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
pids.append({"pid": pid, "dest": master})
errors = []
for entry in pids:
pid = entry["pid"]
return_code = pid.wait()
output = pid.communicate()
Log.debug("return code: %s output: %s" % (return_code, output))
if return_code != 0:
errors.append("Failed to start master on %s with error:\n%s" % (entry["dest"], output[1]))
if errors:
for error in errors:
Log.error(error)
sys.exit(-1)
Log.info("Done starting masters")
def start_slave_nodes(slaves, cl_args):
'''
Star slave nodes
'''
pids = []
for slave in slaves:
Log.info("Starting slave on %s" % slave)
cmd = "%s agent -config %s >> /tmp/nomad_client.log 2>&1 &" \
% (get_nomad_path(cl_args), get_nomad_slave_config_file(cl_args))
if not is_self(slave):
cmd = ssh_remote_execute(cmd, slave, cl_args)
Log.debug(cmd)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
pids.append({"pid": pid, "dest": slave})
errors = []
for entry in pids:
pid = entry["pid"]
return_code = pid.wait()
output = pid.communicate()
Log.debug("return code: %s output: %s" % (return_code, output))
if return_code != 0:
errors.append("Failed to start slave on %s with error:\n%s" % (entry["dest"], output[1]))
if errors:
for error in errors:
Log.error(error)
sys.exit(-1)
Log.info("Done starting slaves")
def read_and_parse_roles(cl_args):
'''
read config files to get roles
'''
roles = dict()
with open(get_inventory_file(cl_args), 'r') as stream:
try:
roles = yaml.load(stream)
except yaml.YAMLError as exc:
Log.error("Error parsing inventory file: %s" % exc)
sys.exit(-1)
if Role.ZOOKEEPERS not in roles or not roles[Role.ZOOKEEPERS]:
Log.error("Zookeeper servers node defined!")
sys.exit(-1)
if Role.CLUSTER not in roles or not roles[Role.CLUSTER]:
Log.error("Heron cluster nodes defined!")
sys.exit(-1)
# Set roles
roles[Role.MASTERS] = set([roles[Role.CLUSTER][0]])
roles[Role.SLAVES] = set(roles[Role.CLUSTER])
roles[Role.ZOOKEEPERS] = set(roles[Role.ZOOKEEPERS])
roles[Role.CLUSTER] = set(roles[Role.CLUSTER])
return roles
def read_file(file_path):
'''
read file
'''
lines = []
with open(file_path, "r") as tf:
lines = [line.strip("\n") for line in tf.readlines() if not line.startswith("#")]
# filter empty lines
lines = [line for line in lines if line]
return lines
def call_editor(file_path):
'''
call editor
'''
EDITOR = os.environ.get('EDITOR', 'vim')
with open(file_path, 'r+') as tf:
call([EDITOR, tf.name])
def get_inventory_file(cl_args):
'''
get the location of inventory file
'''
return "%s/standalone/inventory.yaml" % cl_args["config_path"]
def ssh_remote_execute(cmd, host, cl_args):
'''
get ssh remote execute command
'''
ssh = 'ssh -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null %s "%s"' % (host, cmd)
return ssh
def scp_cmd(src, dest, cl_args):
'''
get scp command
'''
scp = 'scp -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null %s %s ' % (src, dest)
return scp
def get_nomad_path(cl_args):
'''
get path to nomad binary
'''
return "%s/heron-nomad" % config.get_heron_bin_dir()
def get_nomad_master_config_file(cl_args):
'''
get path to nomad master config file
'''
return "%s/standalone/resources/master.hcl" % config.get_heron_conf_dir()
def get_nomad_slave_config_file(cl_args):
'''
get path to nomad slave config file
'''
return "%s/standalone/resources/slave.hcl" % config.get_heron_conf_dir()
def get_apiserver_job_file(cl_args):
'''
get path to API server job file
'''
return "%s/standalone/resources/apiserver.hcl" % config.get_heron_conf_dir()
def get_heron_tools_job_file(cl_args):
'''
get path to API server job file
'''
return "%s/standalone/resources/heron_tools.hcl" % config.get_heron_conf_dir()
def get_remote_home(host, cl_args):
'''
get home directory of remote host
'''
cmd = "echo ~"
if not is_self(host):
cmd = ssh_remote_execute(cmd, host, cl_args)
pid = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return_code = pid.wait()
output = pid.communicate()
if return_code != 0:
Log.error("Failed to get home path for remote host %s with output:\n%s" % (host, output))
sys.exit(-1)
return output[0].strip("\n")
def get_self_ip():
'''
get IP address of self
'''
return socket.gethostbyname(socket.gethostname())
def get_self_hostname():
'''
get hostname of self
'''
return socket.gethostname()
def get_hostname(ip_addr, cl_args):
'''
get host name of remote host
'''
if is_self(ip_addr):
return get_self_hostname()
cmd = "hostname"
ssh_cmd = ssh_remote_execute(cmd, ip_addr, cl_args)
pid = subprocess.Popen(ssh_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return_code = pid.wait()
output = pid.communicate()
if return_code != 0:
Log.error("Failed to get hostname for remote host %s with output:\n%s" % (ip_addr, output))
sys.exit(-1)
return output[0].strip("\n")
def check_sure(cl_args, prompt):
yes = input("%s" % prompt + ' (yes/no): ')
if yes == "y" or yes == "yes":
return True
elif yes == "n" or yes == "no":
return False
else:
print('Invalid input. Please input "yes" or "no"')
def get_jobs(cl_args, nomad_addr):
r = requests.get("http://%s:4646/v1/jobs" % nomad_addr)
if r.status_code != 200:
Log.error("Failed to get list of jobs")
Log.debug("Response: %s" % r)
sys.exit(-1)
return r.json()
def delete_job(cl_args, job_id, nomad_addr):
r = requests.delete("http://%s:4646/v1/job/%s" % (nomad_addr, job_id), data={'purge':'true'})
if r.status_code != 200:
Log.error("Failed to delete job %s" % job_id)
Log.debug("Response: %s" % r)
sys.exit(-1)
def is_self(addr):
'''
check if this host is this addr
'''
ips = []
for i in netifaces.interfaces():
entry = netifaces.ifaddresses(i)
if netifaces.AF_INET in entry:
for ipv4 in entry[netifaces.AF_INET]:
if "addr" in ipv4:
ips.append(ipv4["addr"])
return addr in ips or addr == get_self_hostname()