blob: da228aa1f10af3cd45a6dab24a22cc4f3e6609ea [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' cli_helper.py '''
import logging
import requests
import heron.tools.common.src.python.utils.config as config
from heron.tools.cli.src.python.result import SimpleResult, Status
import heron.tools.cli.src.python.args as args
import heron.tools.cli.src.python.execute as execute
import heron.tools.cli.src.python.jars as jars
import heron.tools.cli.src.python.rest as rest
from heron.common.src.python.utils.log import Log
################################################################################
def create_parser(subparsers, action, help_arg):
'''
:param subparsers:
:param action:
:param help_arg:
:return:
'''
parser = subparsers.add_parser(
action,
help=help_arg,
usage="%(prog)s [options] cluster/[role]/[env] <topology-name>",
add_help=True)
args.add_titles(parser)
args.add_cluster_role_env(parser)
args.add_topology(parser)
args.add_config(parser)
args.add_service_url(parser)
args.add_verbose(parser)
parser.set_defaults(subcommand=action)
return parser
################################################################################
def flatten_args(fargs):
temp_args = []
for k, v in list(fargs.items()):
if isinstance(v, list):
temp_args.extend([(k, value) for value in v])
else:
temp_args.append((k, v))
return temp_args
################################################################################
# pylint: disable=dangerous-default-value
def run_server(command, cl_args, action, extra_args=dict()):
'''
helper function to take action on topologies using REST API
:param command:
:param cl_args:
:param action: description of action taken
:return:
'''
topology_name = cl_args['topology-name']
service_endpoint = cl_args['service_url']
apiroute = rest.ROUTE_SIGNATURES[command][1] % (
cl_args['cluster'],
cl_args['role'],
cl_args['environ'],
topology_name
)
service_apiurl = service_endpoint + apiroute
service_method = rest.ROUTE_SIGNATURES[command][0]
# convert the dictionary to a list of tuples
data = flatten_args(extra_args)
err_msg = "Failed to %s: %s" % (action, topology_name)
succ_msg = "Successfully %s: %s" % (action, topology_name)
try:
r = service_method(service_apiurl, data=data)
s = Status.Ok if r.status_code == requests.codes.ok else Status.HeronError
if r.status_code != requests.codes.ok:
Log.error(r.json().get('message', "Unknown error from API server %d" % r.status_code))
except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as err:
Log.error(err)
return SimpleResult(Status.HeronError, err_msg, succ_msg)
return SimpleResult(s, err_msg, succ_msg)
################################################################################
# pylint: disable=dangerous-default-value
def run_direct(command, cl_args, action, extra_args=[], extra_lib_jars=[]):
'''
helper function to take action on topologies
:param command:
:param cl_args:
:param action: description of action taken
:return:
'''
topology_name = cl_args['topology-name']
new_args = [
"--cluster", cl_args['cluster'],
"--role", cl_args['role'],
"--environment", cl_args['environ'],
"--submit_user", cl_args['submit_user'],
"--heron_home", config.get_heron_dir(),
"--config_path", cl_args['config_path'],
"--override_config_file", cl_args['override_config_file'],
"--release_file", config.get_heron_release_file(),
"--topology_name", topology_name,
"--command", command,
]
new_args += extra_args
lib_jars = config.get_heron_libs(jars.scheduler_jars() + jars.statemgr_jars())
lib_jars += extra_lib_jars
if Log.getEffectiveLevel() == logging.DEBUG:
new_args.append("--verbose")
# invoke the runtime manager to kill the topology
result = execute.heron_class(
'org.apache.heron.scheduler.RuntimeManagerMain',
lib_jars,
extra_jars=[],
args=new_args
)
err_msg = "Failed to %s: %s" % (action, topology_name)
succ_msg = "Successfully %s: %s" % (action, topology_name)
result.add_context(err_msg, succ_msg)
return result
################################################################################
def run(command, cl_args, action, extra_lib_jars=[]):
if cl_args['deploy_mode'] == config.SERVER_MODE:
return run_server(command, cl_args, action, extra_args=dict())
return run_direct(command, cl_args, action, extra_args=[], extra_lib_jars=extra_lib_jars)