Deep clean heron-explorer (#3588)
* Deep clean heron-explorer
* use click instead of argparse pattern
* list all topologies instead of just 20
* not strictly the same CLI argument names but same functionality
* add smoke testes for heron-tracker and heron-explorer in local-test-runner
* heron-explorer clean PR feedback
diff --git a/heron/tools/explorer/src/python/BUILD b/heron/tools/explorer/src/python/BUILD
index 4fd76c1..6b10054 100644
--- a/heron/tools/explorer/src/python/BUILD
+++ b/heron/tools/explorer/src/python/BUILD
@@ -6,6 +6,7 @@
reqs = [
"tornado==4.0.2",
"tabulate==0.7.4",
+ "click==7.1.2",
],
deps = [
"//heron/common/src/python:common-py",
diff --git a/heron/tools/explorer/src/python/args.py b/heron/tools/explorer/src/python/args.py
deleted file mode 100644
index c312c35..0000000
--- a/heron/tools/explorer/src/python/args.py
+++ /dev/null
@@ -1,141 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' args.py '''
-import os
-import heron.tools.common.src.python.utils.config as config
-
-# default parameter - url to connect to heron tracker
-DEFAULT_TRACKER_URL = "http://127.0.0.1:8888"
-
-
-# add argument for config file path
-def add_config(parser):
- """ add config """
- # the default config path
- default_config_path = config.get_heron_conf_dir()
-
- parser.add_argument(
- '--config-path',
- metavar='(a string; path to cluster config; default: "' + default_config_path + '")',
- default=os.path.join(config.get_heron_dir(), default_config_path))
-
- return parser
-
-
-# pylint: disable=protected-access
-def add_titles(parser):
- """ add titles """
- parser._positionals.title = "Required arguments"
- parser._optionals.title = "Optional arguments"
- return parser
-
-
-def insert_bool(param, command_args):
- """ insert boolean """
- index = 0
- found = False
- for lelem in command_args:
- if lelem == '--' and not found:
- break
- if lelem == param:
- found = True
- break
- index = index + 1
-
- if found:
- command_args.insert(index + 1, 'True')
- return command_args
-
-
-def insert_bool_values(command_line_args):
- """ insert boolean values """
- args1 = insert_bool('--verbose', command_line_args)
- return args1
-
-
-# add optional argument that sets log level to verbose
-def add_verbose(parser):
- """ add optional verbose argument"""
- parser.add_argument(
- '--verbose',
- metavar='(a boolean; default: "false")',
- type=bool,
- default=False)
- return parser
-
-
-def add_tracker_url(parser):
- """ add optional tracker_url argument """
- parser.add_argument(
- '--tracker_url',
- metavar='(tracker url; default: "' + DEFAULT_TRACKER_URL + '")',
- type=str, default=DEFAULT_TRACKER_URL)
- return parser
-
-
-def add_container_id(parser):
- """ add optional argument that specifies container id """
- parser.add_argument(
- '--id',
- help='container ID',
- type=int, metavar='ID')
- return parser
-
-
-def add_component_name(parser):
- """ add optional argument that specifies component name """
- parser.add_argument(
- '--component',
- help='Component name',
- metavar='COMP',
- type=str)
- return parser
-
-
-def add_spouts(parser):
- """ add optional argument that displays spout only """
- parser.add_argument(
- '--spout', help='display spout', action='store_true')
- return parser
-
-
-def add_bolts(parser):
- """ add optional argument that displays bolts only """
- parser.add_argument(
- '--bolt', help='display bolt', action='store_true')
- return parser
-
-
-def add_cluster_role_env(parser):
- """ add argument that specifies topologies location """
- parser.add_argument(
- 'cluster/[role]/[env]', help='Topologies location', type=str,
- metavar='CLUSTER/[ROLE]/[ENV]')
- return parser
-
-
-def add_topology_name(parser):
- """ add argument that specifies topology name """
- parser.add_argument(
- 'topology-name',
- help='Topology name'
- )
- return parser
diff --git a/heron/tools/explorer/src/python/clusters.py b/heron/tools/explorer/src/python/clusters.py
deleted file mode 100644
index bdd208f..0000000
--- a/heron/tools/explorer/src/python/clusters.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' clusters.py '''
-from heron.common.src.python.utils.log import Log
-import heron.tools.explorer.src.python.args as args
-import heron.tools.common.src.python.access.tracker_access as tracker_access
-
-
-def create_parser(subparsers):
- """ create argument parser """
- parser = subparsers.add_parser(
- 'clusters',
- help='Display existing clusters',
- usage="%(prog)s [options]",
- add_help=True)
- args.add_verbose(parser)
- args.add_tracker_url(parser)
- parser.set_defaults(subcommand='clusters')
- return subparsers
-
-# pylint: disable=unused-argument,superfluous-parens
-def run(command, parser, cl_args, unknown_args):
- """ run command """
- try:
- clusters = tracker_access.get_clusters()
- except:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
- print('Available clusters:')
- for cluster in clusters:
- print(' %s' % cluster)
- return True
diff --git a/heron/tools/explorer/src/python/help.py b/heron/tools/explorer/src/python/help.py
deleted file mode 100644
index 6995849..0000000
--- a/heron/tools/explorer/src/python/help.py
+++ /dev/null
@@ -1,64 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' help.py '''
-from heron.common.src.python.utils.log import Log
-import heron.tools.common.src.python.utils.config as config
-
-
-# pylint: disable=protected-access
-def create_parser(subparsers):
- """ create parser """
- parser = subparsers.add_parser(
- 'help',
- help='Display help',
- add_help=False)
-
- parser._positionals.title = "Required arguments"
- parser._optionals.title = "Optional arguments"
-
- parser.add_argument(
- 'help-command',
- nargs='?',
- default='help',
- help='provide help for a command')
-
- parser.set_defaults(subcommand='help')
- return parser
-
-
-# pylint: disable=unused-argument,superfluous-parens
-def run(command, parser, args, unknown_args):
- """ run command """
- # get the command for detailed help
- command_help = args['help-command']
-
- # if no command is provided, just print main help
- if command_help == 'help':
- parser.print_help()
- return True
-
- # get the subparser for the specific command
- subparser = config.get_subparser(parser, command_help)
- if subparser:
- print(subparser.format_help())
- return True
- Log.error("Unknown subcommand \'%s\'" % command_help)
- return False
diff --git a/heron/tools/explorer/src/python/logicalplan.py b/heron/tools/explorer/src/python/logicalplan.py
index 3b6fa48..e467dba 100644
--- a/heron/tools/explorer/src/python/logicalplan.py
+++ b/heron/tools/explorer/src/python/logicalplan.py
@@ -19,47 +19,18 @@
# under the License.
''' logicalplan.py '''
+import sys
+
from collections import defaultdict
+
from heron.common.src.python.utils.log import Log
-import heron.tools.explorer.src.python.args as args
+
import heron.tools.common.src.python.access.tracker_access as tracker_access
+
from tabulate import tabulate
-def create_parser(subparsers):
- """ create parser """
- components_parser = subparsers.add_parser(
- 'components',
- help='Display information of a topology\'s components',
- usage="%(prog)s cluster/[role]/[env] topology-name [options]",
- add_help=False)
- args.add_cluster_role_env(components_parser)
- args.add_topology_name(components_parser)
- args.add_spouts(components_parser)
- args.add_bolts(components_parser)
- args.add_verbose(components_parser)
- args.add_tracker_url(components_parser)
- args.add_config(components_parser)
- components_parser.set_defaults(subcommand='components')
-
- return subparsers
-
-
-# pylint: disable=misplaced-bare-raise
-def parse_topo_loc(cl_args):
- """ parse topology location """
- try:
- topo_loc = cl_args['cluster/[role]/[env]'].split('/')
- topo_loc.append(cl_args['topology-name'])
- if len(topo_loc) != 4:
- raise
- return topo_loc
- except Exception:
- Log.error('Error: invalid topology location')
- raise
-
-
-def to_table(components, topo_info):
+def to_table(components, topo_info, component_filter):
""" normalize raw logical plan info to table """
inputs, outputs = defaultdict(list), defaultdict(list)
for ctype, component in list(components.items()):
@@ -76,6 +47,10 @@
# stages is an int so keep going
if ctype == "stages":
continue
+ if component_filter == "spouts" and ctype != "spouts":
+ continue
+ if component_filter == "bolts" and ctype != "bolts":
+ continue
for component_name, component_info in list(component.items()):
row = [ctype[:-1], component_name]
if ctype == 'spouts':
@@ -89,59 +64,13 @@
return info, header
-def filter_bolts(table, header):
- """ filter to keep bolts """
- bolts_info = []
- for row in table:
- if row[0] == 'bolt':
- bolts_info.append(row)
- return bolts_info, header
-
-
-def filter_spouts(table, header):
- """ filter to keep spouts """
- spouts_info = []
- for row in table:
- if row[0] == 'spout':
- spouts_info.append(row)
- return spouts_info, header
-
-
-# pylint: disable=unused-argument,superfluous-parens
-def run(cl_args, compo_type):
+def run(component_type: str, cluster: str, role: str, environment: str, topology: str):
""" run command """
- cluster, role, env = cl_args['cluster'], cl_args['role'], cl_args['environ']
- topology = cl_args['topology-name']
- spouts_only, bolts_only = cl_args['spout'], cl_args['bolt']
try:
- components = tracker_access.get_logical_plan(cluster, env, topology, role)
- topo_info = tracker_access.get_topology_info(cluster, env, topology, role)
- table, header = to_table(components, topo_info)
- if spouts_only == bolts_only:
- print(tabulate(table, headers=header))
- elif spouts_only:
- table, header = filter_spouts(table, header)
- print(tabulate(table, headers=header))
- else:
- table, header = filter_bolts(table, header)
- print(tabulate(table, headers=header))
- return True
+ components = tracker_access.get_logical_plan(cluster, environment, topology, role)
+ topo_info = tracker_access.get_topology_info(cluster, environment, topology, role)
except:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
-
-
-
-def run_components(command, parser, cl_args, unknown_args):
- """ run components command """
- return run(cl_args, 'all')
-
-
-def run_spouts(command, parser, cl_args, unknown_args):
- """ run spouts command """
- return run(cl_args, 'spouts')
-
-
-def run_bolts(command, parser, cl_args, unknown_args):
- """ run bolts command """
- return run(cl_args, 'bolts')
+ Log.error("Fail to connect to tracker")
+ sys.exit(1)
+ table, header = to_table(components, topo_info, component_type)
+ print(tabulate(table, headers=header))
diff --git a/heron/tools/explorer/src/python/main.py b/heron/tools/explorer/src/python/main.py
index a1c8224..be35780 100644
--- a/heron/tools/explorer/src/python/main.py
+++ b/heron/tools/explorer/src/python/main.py
@@ -19,191 +19,186 @@
# under the License.
''' main.py '''
-import argparse
+import logging
+import os
import sys
-import time
import heron.common.src.python.utils.log as log
+import heron.tools.common.src.python.access.tracker_access as tracker_access
import heron.tools.common.src.python.utils.config as config
-import heron.tools.explorer.src.python.args as parse
-import heron.tools.explorer.src.python.clusters as clusters
-# pylint: disable=redefined-builtin
-import heron.tools.explorer.src.python.help as help
import heron.tools.explorer.src.python.logicalplan as logicalplan
-import heron.tools.explorer.src.python.opts as opts
import heron.tools.explorer.src.python.physicalplan as physicalplan
import heron.tools.explorer.src.python.topologies as topologies
-import heron.tools.explorer.src.python.version as version
+
+import click
+
+from tornado.options import define
Log = log.Log
-# pylint: disable=bad-super-call
-class SubcommandHelpFormatter(argparse.RawDescriptionHelpFormatter):
- """ subcommand help message formatter """
- def _format_action(self, action):
- parts = super(argparse.RawDescriptionHelpFormatter,
- self)._format_action(action)
- if action.nargs == argparse.PARSER:
- parts = "\n".join(parts.split("\n")[1:])
- return parts
+DEFAULT_TRACKER_URL = "http://127.0.0.1:8888"
+
+try:
+ click_extra = {"max_content_width": os.get_terminal_size().columns}
+except Exception:
+ click_extra = {}
-################################################################################
-# Main parser
-################################################################################
-def create_parser():
- """ create parser """
- help_epilog = '''Getting more help:
- heron-explorer help <command> Display help and options for <command>\n
- For detailed documentation, go to https://heron.incubator.apache.org'''
+def config_path_option():
+ return click.option(
+ "--config-path",
+ default=config.get_heron_conf_dir(),
+ show_default=True,
+ help="Path to heron's config clusters config directory"
+ )
- parser = argparse.ArgumentParser(
- prog='heron-explorer',
- epilog=help_epilog,
- formatter_class=SubcommandHelpFormatter,
- add_help=False)
+def tracker_url_option():
+ return click.option(
+ "--tracker-url",
+ default=DEFAULT_TRACKER_URL,
+ show_default=True,
+ help="URL to a heron-tracker instance"
+ )
- # sub-commands
- subparsers = parser.add_subparsers(
- title="Available commands",
- metavar='<command> <options>')
+def show_version(_, __, value):
+ if value:
+ config.print_build_info()
+ sys.exit(0)
- # subparser for subcommands related to clusters
- clusters.create_parser(subparsers)
+@click.group(context_settings=click_extra)
+@click.option(
+ "--version",
+ is_flag=True,
+ is_eager=True,
+ expose_value=False,
+ callback=show_version,
+)
+@click.option("-v", "--verbose", count=True)
+def cli(verbose: int):
+ levels = {
+ 0: logging.WARNING,
+ 1: logging.INFO,
+ 2: logging.DEBUG,
+ }
+ log.configure(levels.get(verbose, logging.DEBUG))
- # subparser for subcommands related to logical plan
- logicalplan.create_parser(subparsers)
-
- # subparser for subcommands related to physical plan
- physicalplan.create_parser(subparsers)
-
- # subparser for subcommands related to displaying info
- topologies.create_parser(subparsers)
-
- # subparser for help subcommand
- help.create_parser(subparsers)
-
- # subparser for version subcommand
- version.create_parser(subparsers)
-
- return parser
-
-
-################################################################################
-# Run the command
-################################################################################
-# pylint: disable=too-many-return-statements
-def run(command, *args):
- """ run command """
- # show all clusters
- if command == 'clusters':
- return clusters.run(command, *args)
-
- # show topologies
- if command == 'topologies':
- return topologies.run(command, *args)
-
- # physical plan
- if command == 'containers':
- return physicalplan.run_containers(command, *args)
- if command == 'metrics':
- return physicalplan.run_metrics(command, *args)
-
- # logical plan
- if command == 'components':
- return logicalplan.run_components(command, *args)
- if command == 'spouts':
- return logicalplan.run_spouts(command, *args)
- if command == 'bolts':
- return logicalplan.run_bolts(command, *args)
-
- # help
- if command == 'help':
- return help.run(command, *args)
-
- # version
- if command == 'version':
- return version.run(command, *args)
-
- return 1
-
-
-# pylint: disable=superfluous-parens
-def extract_common_args(command, parser, cl_args):
- """ extract common args """
+@cli.command("clusters")
+@tracker_url_option()
+def cli_clusters(tracker_url: str):
+ define("tracker_url", tracker_url)
try:
- # do not pop like cli because ``topologies`` subcommand still needs it
- cluster_role_env = cl_args['cluster/[role]/[env]']
- config_path = cl_args['config_path']
- except KeyError:
- # if some of the arguments are not found, print error and exit
- subparser = config.get_subparser(parser, command)
- print(subparser.format_help())
- return dict()
- cluster = config.get_heron_cluster(cluster_role_env)
- config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
+ clusters = tracker_access.get_clusters()
+ except:
+ Log.error("Fail to connect to tracker")
+ sys.exit(1)
+ print("Available clusters:")
+ for cluster in clusters:
+ print(f" {cluster}")
- new_cl_args = dict()
- try:
- cluster_tuple = config.parse_cluster_role_env(cluster_role_env, config_path)
- new_cl_args['cluster'] = cluster_tuple[0]
- new_cl_args['role'] = cluster_tuple[1]
- new_cl_args['environ'] = cluster_tuple[2]
- new_cl_args['config_path'] = config_path
- except Exception as e:
- Log.error("Unable to get valid topology location: %s", str(e))
- return dict()
+@cli.command("topologies")
+@tracker_url_option()
+@click.argument("cre", metavar="CLUSTER[/ROLE[/ENV]]")
+def cli_topologies(tracker_url: str, cre: str):
+ """Show the topologies under the given CLUSTER[/ROLE[/ENV]]."""
+ define("tracker_url", tracker_url)
+ topologies.run(
+ cre=cre,
+ )
- cl_args.update(new_cl_args)
- return cl_args
+@cli.command()
+@config_path_option()
+@tracker_url_option()
+@click.option(
+ "--component-type",
+ type=click.Choice(["all", "spouts", "bolts"]),
+ default="all",
+ show_default=True,
+)
+@click.argument("cre", metavar="CLUSTER[/ROLE[/ENV]]")
+@click.argument("topology")
+def logical_plan(
+ config_path: str,
+ cre: str,
+ topology: str,
+ component_type: str,
+ tracker_url: str,
+) -> None:
+ """Show logical plan information for the given topology."""
+ define("tracker_url", tracker_url)
+ cluster = config.get_heron_cluster(cre)
+ cluster_config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
+ cluster, role, environment = config.parse_cluster_role_env(cre, cluster_config_path)
+ logicalplan.run(
+ component_type=component_type,
+ cluster=cluster,
+ role=role,
+ environment=environment,
+ topology=topology,
+ )
-################################################################################
-# Run the command
-################################################################################
-def main(args):
- """ main """
- # create the argument parser
- parser = create_parser()
+@cli.group()
+def physical_plan():
+ pass
- # if no argument is provided, print help and exit
- if not args:
- parser.print_help()
- return 0
+@physical_plan.command()
+@config_path_option()
+@tracker_url_option()
+@click.option("--component", help="name of component to limit metrics to")
+@click.argument("cre", metavar="CLUSTER[/ROLE[/ENV]]")
+@click.argument("topology")
+def metrics(
+ config_path: str,
+ cre: str,
+ tracker_url: str,
+ topology: str,
+ component: str,
+) -> None:
+ define("tracker_url", tracker_url)
+ cluster = config.get_heron_cluster(cre)
+ cluster_config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
+ cluster, role, environment = config.parse_cluster_role_env(cre, cluster_config_path)
- # insert the boolean values for some of the options
- all_args = parse.insert_bool_values(args)
+ physicalplan.run_metrics(
+ cluster=cluster,
+ role=role,
+ environment=environment,
+ component=component,
+ topology=topology,
+ )
- # parse the args
- args, unknown_args = parser.parse_known_args(args=all_args)
- command_line_args = vars(args)
- command = command_line_args['subcommand']
+def validate_container_id(_, __, value):
+ if value is None:
+ return None
+ if value <= 0:
+ raise click.BadParameter("container id must be greather than zero")
+ return value - 1
- if unknown_args:
- Log.error('Unknown argument: %s', unknown_args[0])
- # show help message
- command_line_args['help-command'] = command
- command = 'help'
+@physical_plan.command()
+@config_path_option()
+@tracker_url_option()
+@click.option("--id", "container_id", type=int, help="container id", callback=validate_container_id)
+@click.argument("cre", metavar="CLUSTER[/ROLE[/ENV]]")
+@click.argument("topology")
+def containers(
+ config_path: str,
+ cre: str,
+ tracker_url: str,
+ topology: str,
+ container_id: int,
+) -> None:
+ define("tracker_url", tracker_url)
+ cluster = config.get_heron_cluster(cre)
+ cluster_config_path = config.get_heron_cluster_conf_dir(cluster, config_path)
+ cluster, role, environment = config.parse_cluster_role_env(cre, cluster_config_path)
- if command not in ['help', 'version']:
- opts.set_tracker_url(command_line_args)
- log.set_logging_level(command_line_args)
- if command not in ['topologies', 'clusters']:
- command_line_args = extract_common_args(command, parser, command_line_args)
- if not command_line_args:
- return 1
- Log.info("Using tracker URL: %s", command_line_args["tracker_url"])
-
- # timing command execution
- start = time.time()
- ret = run(command, parser, command_line_args, unknown_args)
- end = time.time()
-
- if command != 'help':
- sys.stdout.flush()
- Log.info('Elapsed time: %.3fs.', (end - start))
-
- return 0 if ret else 1
+ physicalplan.run_containers(
+ cluster=cluster,
+ role=role,
+ environment=environment,
+ container_id=container_id,
+ topology=topology,
+ )
if __name__ == "__main__":
- sys.exit(main(sys.argv[1:]))
+ cli() # pylint: disable=no-value-for-parameter
diff --git a/heron/tools/explorer/src/python/opts.py b/heron/tools/explorer/src/python/opts.py
deleted file mode 100644
index 47c91f0..0000000
--- a/heron/tools/explorer/src/python/opts.py
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' opts.py '''
-from tornado.options import define
-
-def set_tracker_url(cl_args):
- """ define global Tornado variable """
- define("tracker_url", cl_args["tracker_url"])
diff --git a/heron/tools/explorer/src/python/physicalplan.py b/heron/tools/explorer/src/python/physicalplan.py
index a577ee2..e647ca4 100644
--- a/heron/tools/explorer/src/python/physicalplan.py
+++ b/heron/tools/explorer/src/python/physicalplan.py
@@ -22,62 +22,18 @@
import sys
from heron.common.src.python.utils.log import Log
import heron.tools.common.src.python.access.tracker_access as tracker_access
-import heron.tools.explorer.src.python.args as args
from tabulate import tabulate
-def create_parser(subparsers):
- """ create parser """
- metrics_parser = subparsers.add_parser(
- 'metrics',
- help='Display info of a topology\'s metrics',
- usage="%(prog)s cluster/[role]/[env] topology-name [options]",
- add_help=False)
- args.add_cluster_role_env(metrics_parser)
- args.add_topology_name(metrics_parser)
- args.add_verbose(metrics_parser)
- args.add_tracker_url(metrics_parser)
- args.add_config(metrics_parser)
- args.add_component_name(metrics_parser)
- metrics_parser.set_defaults(subcommand='metrics')
-
- containers_parser = subparsers.add_parser(
- 'containers',
- help='Display info of a topology\'s containers metrics',
- usage="%(prog)s cluster/[role]/[env] topology-name [options]",
- add_help=False)
- args.add_cluster_role_env(containers_parser)
- args.add_topology_name(containers_parser)
- args.add_verbose(containers_parser)
- args.add_tracker_url(containers_parser)
- args.add_config(containers_parser)
- args.add_container_id(containers_parser)
- containers_parser.set_defaults(subcommand='containers')
-
- return subparsers
-
-
-# pylint: disable=misplaced-bare-raise
-def parse_topo_loc(cl_args):
- """ parse topology location """
- try:
- topo_loc = cl_args['cluster/[role]/[env]'].split('/')
- topo_name = cl_args['topology-name']
- topo_loc.append(topo_name)
- if len(topo_loc) != 4:
- raise
- return topo_loc
- except Exception:
- Log.error('Invalid topology location')
- raise
-
-
def to_table(metrics):
""" normalize raw metrics API result to table """
all_queries = tracker_access.metric_queries()
m = tracker_access.queries_map()
- names = list(metrics.values())[0].keys()
+ header = ['container id'] + [m[k] for k in all_queries if k in list(metrics.keys())]
stats = []
+ if not metrics:
+ return stats, header
+ names = list(metrics.values())[0].keys()
for n in names:
info = [n]
for field in all_queries:
@@ -86,120 +42,79 @@
except KeyError:
pass
stats.append(info)
- header = ['container id'] + [m[k] for k in all_queries if k in list(metrics.keys())]
return stats, header
-# pylint: disable=unused-argument,superfluous-parens
-def run_metrics(command, parser, cl_args, unknown_args):
- """ run metrics subcommand """
- cluster, role, env = cl_args['cluster'], cl_args['role'], cl_args['environ']
- topology = cl_args['topology-name']
+def run_metrics(
+ cluster: str,
+ role: str,
+ environment: str,
+ topology: str,
+ component: str,
+) -> None:
+ """Render a table of metrics."""
try:
- result = tracker_access.get_topology_info(cluster, env, topology, role)
- spouts = list(result['physical_plan']['spouts'].keys())
- bolts = list(result['physical_plan']['bolts'].keys())
- components = spouts + bolts
- cname = cl_args['component']
- if cname:
- if cname in components:
- components = [cname]
- else:
- Log.error('Unknown component: \'%s\'' % cname)
- raise
+ result = tracker_access.get_topology_info(cluster, environment, topology, role)
except Exception:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
+ Log.error("Fail to connect to tracker")
+ sys.exit(1)
+ spouts = list(result['physical_plan']['spouts'].keys())
+ bolts = list(result['physical_plan']['bolts'].keys())
+ components = spouts + bolts
+ if component:
+ if component in components:
+ components = [component]
+ else:
+ Log.error(f"Unknown component: {component!r}")
+ sys.exit(1)
cresult = []
for comp in components:
try:
- metrics = tracker_access.get_component_metrics(comp, cluster, env, topology, role)
+ metrics = tracker_access.get_component_metrics(comp, cluster, environment, topology, role)
except:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
+ Log.error("Fail to connect to tracker")
+ sys.exit(1)
stat, header = to_table(metrics)
cresult.append((comp, stat, header))
- for i, (comp, stat, header) in enumerate(cresult):
+ for i, (c, stat, header) in enumerate(cresult):
if i != 0:
print('')
- print('\'%s\' metrics:' % comp)
+ print(f"{c!r} metrics:")
print(tabulate(stat, headers=header))
- return True
-
-# pylint: disable=unused-argument,superfluous-parens
-def run_bolts(command, parser, cl_args, unknown_args):
- """ run bolts subcommand """
- cluster, role, env = cl_args['cluster'], cl_args['role'], cl_args['environ']
- topology = cl_args['topology-name']
+def run_containers(
+ cluster: str,
+ role: str,
+ environment: str,
+ topology: str,
+ container_id: str,
+) -> None:
+ """Render a table of container information."""
try:
- result = tracker_access.get_topology_info(cluster, env, topology, role)
- bolts = list(result['physical_plan']['bolts'].keys())
- bolt_name = cl_args['bolt']
- if bolt_name:
- if bolt_name in bolts:
- bolts = [bolt_name]
- else:
- Log.error('Unknown bolt: \'%s\'' % bolt_name)
- raise
- except Exception:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
- bolts_result = []
- for bolt in bolts:
- try:
- metrics = tracker_access.get_component_metrics(bolt, cluster, env, topology, role)
- stat, header = to_table(metrics)
- bolts_result.append((bolt, stat, header))
- except Exception:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
- for i, (bolt, stat, header) in enumerate(bolts_result):
- if i != 0:
- print('')
- print('\'%s\' metrics:' % bolt)
- print(tabulate(stat, headers=header))
- return True
-
-# pylint: disable=too-many-locals,superfluous-parens
-def run_containers(command, parser, cl_args, unknown_args):
- """ run containers subcommand """
- cluster, role, env = cl_args['cluster'], cl_args['role'], cl_args['environ']
- topology = cl_args['topology-name']
- container_id = cl_args['id']
- try:
- result = tracker_access.get_topology_info(cluster, env, topology, role)
+ result = tracker_access.get_topology_info(cluster, environment, topology, role)
except:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
+ Log.error("Fail to connect to tracker")
+ sys.exit(1)
containers = result['physical_plan']['stmgrs']
all_bolts, all_spouts = set(), set()
- for _, bolts in list(result['physical_plan']['bolts'].items()):
- all_bolts = all_bolts | set(bolts)
- for _, spouts in list(result['physical_plan']['spouts'].items()):
- all_spouts = all_spouts | set(spouts)
- stmgrs = list(containers.keys())
- stmgrs.sort()
+ for bolts in result['physical_plan']['bolts'].values():
+ all_bolts |= set(bolts)
+ for spouts in result['physical_plan']['spouts'].values():
+ all_spouts |= set(spouts)
+ stmgrs = sorted(containers.keys())
if container_id is not None:
- try:
- normalized_cid = container_id - 1
- if normalized_cid < 0:
- raise
- stmgrs = [stmgrs[normalized_cid]]
- except:
- Log.error('Invalid container id: %d' % container_id)
- return False
+ stmgrs = [stmgrs[container_id]]
table = []
- for sid, name in enumerate(stmgrs):
- cid = sid + 1
- host = containers[name]["host"]
- port = containers[name]["port"]
- pid = containers[name]["pid"]
+ for cid, name in enumerate(stmgrs, (container_id + 1 if container_id else 1)):
instances = containers[name]["instance_ids"]
- bolt_nums = len([instance for instance in instances if instance in all_bolts])
- spout_nums = len([instance for instance in instances if instance in all_spouts])
- table.append([cid, host, port, pid, bolt_nums, spout_nums, len(instances)])
+ table.append([
+ cid,
+ containers[name]["host"],
+ containers[name]["port"],
+ containers[name]["pid"],
+ len([1 for instance in instances if instance in all_bolts]),
+ len([1 for instance in instances if instance in all_spouts]),
+ len(instances),
+ ])
headers = ["container", "host", "port", "pid", "#bolt", "#spout", "#instance"]
- sys.stdout.flush()
print(tabulate(table, headers=headers))
- return True
diff --git a/heron/tools/explorer/src/python/topologies.py b/heron/tools/explorer/src/python/topologies.py
index ecb0dfc..d341ec6 100644
--- a/heron/tools/explorer/src/python/topologies.py
+++ b/heron/tools/explorer/src/python/topologies.py
@@ -19,112 +19,42 @@
# under the License.
''' topologies.py '''
+import sys
+
from heron.common.src.python.utils.log import Log
-import heron.tools.explorer.src.python.args as args
import heron.tools.common.src.python.access.tracker_access as tracker_access
+
from tabulate import tabulate
-def create_parser(subparsers):
- ''' create parser '''
- parser = subparsers.add_parser(
- 'topologies',
- help='Display running topologies',
- usage="%(prog)s cluster/[role]/[env] [options]",
- add_help=True)
- args.add_cluster_role_env(parser)
- args.add_verbose(parser)
- args.add_tracker_url(parser)
- args.add_config(parser)
- parser.set_defaults(subcommand='topologies')
- return subparsers
-
-
def to_table(result):
- ''' normalize raw result to table '''
- max_count = 20
- table, count = [], 0
- for role, envs_topos in list(result.items()):
- for env, topos in list(envs_topos.items()):
+ table = []
+ for role, envs_topos in result.items():
+ for env, topos in envs_topos.items():
for topo in topos:
- count += 1
- if count > max_count:
- continue
table.append([role, env, topo])
header = ['role', 'env', 'topology']
- rest_count = 0 if count <= max_count else count - max_count
- return table, header, rest_count
+ return table, header
-# pylint: disable=superfluous-parens
-def show_cluster(cl_args, cluster):
- ''' print topologies information to stdout '''
+def run(cre: str) -> None:
+ """Print all topologies under the given CRE."""
+ cluster, *role_env = cre.split('/')
+ if not role_env:
+ get_topologies = tracker_access.get_cluster_topologies
+ elif len(role_env) == 1:
+ get_topologies = tracker_access.get_cluster_role_topologies
+ elif len(role_env) == 2:
+ get_topologies = tracker_access.get_cluster_role_env_topologies
+ else:
+ Log.error("Invalid topologies selection")
+ sys.exit(1)
try:
- result = tracker_access.get_cluster_topologies(cluster)
- if not result:
- Log.error('No topologies in cluster \'%s\'' % cluster)
- return False
- result = result[cluster]
+ result = get_topologies(cluster, *role_env)
except Exception:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
- table, header, rest_count = to_table(result)
- print('Topologies running in cluster \'%s\'' % cluster)
- if rest_count:
- print(' with %d more...' % rest_count)
+ Log.error("Fail to connect to tracker")
+ sys.exit(1)
+ topologies = result[cluster]
+ table, header = to_table(topologies)
+ print(f"Topologies in {cre}:")
print(tabulate(table, headers=header))
- return True
-
-
-# pylint: disable=superfluous-parens
-def show_cluster_role(cl_args, cluster, role):
- ''' print topologies information to stdout '''
- try:
- result = tracker_access.get_cluster_role_topologies(cluster, role)
- if not result:
- Log.error('Unknown cluster/role \'%s\'' % '/'.join([cluster, role]))
- return False
- result = result[cluster]
- except Exception:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
- table, header, rest_count = to_table(result)
- print('Topologies running in cluster \'%s\' submitted by \'%s\':' % (cluster, role))
- if rest_count:
- print(' with %d more...' % rest_count)
- print(tabulate(table, headers=header))
- return True
-
-
-# pylint: disable=superfluous-parens
-def show_cluster_role_env(cl_args, cluster, role, env):
- ''' print topologies information to stdout '''
- try:
- result = tracker_access.get_cluster_role_env_topologies(cluster, role, env)
- if not result:
- Log.error('Unknown cluster/role/env \'%s\'' % '/'.join([cluster, role, env]))
- return False
- result = result[cluster]
- except Exception:
- Log.error("Fail to connect to tracker: \'%s\'", cl_args["tracker_url"])
- return False
- table, header, rest_count = to_table(result)
- print('Topologies running in cluster \'%s\', submitted by \'%s\', and\
- under environment \'%s\':' % (cluster, role, env))
- if rest_count:
- print(' with %d more...' % rest_count)
- print(tabulate(table, headers=header))
- return True
-
-# pylint: disable=unused-argument
-def run(command, parser, cl_args, unknown_args):
- """ run command """
- location = cl_args['cluster/[role]/[env]'].split('/')
- if len(location) == 1:
- return show_cluster(cl_args, *location)
- if len(location) == 2:
- return show_cluster_role(cl_args, *location)
- if len(location) == 3:
- return show_cluster_role_env(cl_args, *location)
- Log.error('Invalid topologies selection')
- return False
diff --git a/heron/tools/explorer/src/python/version.py b/heron/tools/explorer/src/python/version.py
deleted file mode 100644
index 84773f5..0000000
--- a/heron/tools/explorer/src/python/version.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' version.py '''
-import heron.tools.common.src.python.utils.config as config
-import heron.tools.explorer.src.python.args as args
-
-
-def create_parser(subparsers):
- """ create parser """
- parser = subparsers.add_parser(
- 'version',
- help='Display version',
- usage="%(prog)s",
- add_help=False)
- args.add_titles(parser)
- parser.set_defaults(subcommand='version')
- return parser
-
-# pylint: disable=unused-argument
-def run(command, parser, known_args, unknown_args):
- """ run command """
- config.print_build_info()
- return True
diff --git a/heron/tools/explorer/tests/python/explorer_unittest.py b/heron/tools/explorer/tests/python/explorer_unittest.py
index 72e8b5e..830f773 100644
--- a/heron/tools/explorer/tests/python/explorer_unittest.py
+++ b/heron/tools/explorer/tests/python/explorer_unittest.py
@@ -24,35 +24,13 @@
import unittest
from unittest.mock import Mock
-import heron.tools.common.src.python.access.tracker_access as tracker_access
import heron.tools.explorer.src.python.topologies as topologies
-import heron.tools.explorer.src.python.main as main
-import heron.tools.explorer.src.python.args as args
+from heron.tools.explorer.src.python.main import cli
# pylint: disable=missing-docstring, no-self-use
class ExplorerTest(unittest.TestCase):
''' unit tests '''
- def setUp(self):
- base_path = os.path.dirname(os.path.realpath(__file__))
- info_path = os.path.join(base_path, 'info.json')
- lp_path = os.path.join(base_path, 'logicalplan.json')
- topo_path = os.path.join(base_path, 'topologies.json')
- metrics_path = os.path.join(base_path, 'metrics.json')
- with open(info_path) as f:
- tracker_access.get_topology_info = Mock(return_value=json.load(f))
- with open(lp_path) as f:
- tracker_access.get_logical_plan = Mock(return_value=json.load(f))
- with open(topo_path) as f:
- j = json.load(f)
- tracker_access.get_cluster_topologies = Mock(return_value=j)
- tracker_access.get_cluster_role_topologies = Mock(return_value=j)
- tracker_access.get_cluster_role_env_topologies = Mock(return_value=j)
- with open(metrics_path) as f:
- tracker_access.get_topology_metrics = Mock(return_value=json.load(f))
- clusters = ['nyc', 'london', 'tokyo']
- tracker_access.get_clusters = Mock(return_value=clusters)
-
def sample_topo_result(self):
info = []
info.append(['a1', 'a2', 'a3'])
@@ -72,106 +50,9 @@
tmp[row[1]].append(row[2])
return d, info
- def acc_with_optional_args(self, cl, acc):
- clv = cl + ['--verbose']
- clt = cl + ['--tracker-url="http://a.com"']
- cltv = clv + ['--tracker-url="http://a.com"']
- for cl in clv, clt, cltv:
- acc.append(args.insert_bool_values(cl))
-
- def sample_topo_cls(self):
- clt1 = ['topologies', 'local']
- clt2 = ['topologies', 'local/foo']
- clt3 = ['topologies', 'local/foo/bar']
- all_cl = []
- for cl in clt1, clt2, clt3:
- self.acc_with_optional_args(cl, all_cl)
- return all_cl
-
- def sample_lp_cls(self):
- clco = ['components', 'local/rli/default', 'ExclamationTopology']
- clsp = ['components', 'local/rli/default', 'ExclamationTopology', '--spout']
- clbo = ['components', 'local/rli/default', 'ExclamationTopology', '--bolt']
- good_cls = []
- for cl in clsp, clbo, clco:
- self.acc_with_optional_args(cl, good_cls)
- bad_cl1 = ['spouts', 'local/rli/defult', 'ExclamationTopology']
- return good_cls, bad_cl1
-
- def sample_pp_cls(self):
- clsp = ['metrics', 'local/rli/default', 'ExclamationTopology']
- clco = ['containers', 'local/rli/default', 'ExclamationTopology']
- good_cls = []
- for cl in clsp, clco:
- self.acc_with_optional_args(cl, good_cls)
- good_cls.append(clco + ['--cid 1'])
- bad_cl1 = ['metrics', 'local/rli/defult', 'ExclamationTopology']
- return good_cls, bad_cl1
-
- def sample_cluster_cls(self):
- cl = ['clusters']
- good_cls = []
- self.acc_with_optional_args(cl, good_cls)
- return good_cls
-
- def sample_cli(self):
- clt1 = ['topologies', 'local']
- clt2 = ['topologies', 'local/foo']
- clt3 = ['topologies', 'local/foo/bar']
- clb2 = ['metrics', 'local/foo/bar', 'topo']
- clc1 = ['components', 'local/foo/bar', 'topo']
- clc1 = ['components', 'local/foo/bar', 'topo', '--spout']
- clc1 = ['components', 'local/foo/bar', 'topo', '--bolt']
- clc2 = ['containers', 'local/foo/bar', 'topo']
- cll = [clt1, clt2, clt3, clb2, clc1, clc2]
- all_cl = []
- for cl in cll:
- self.acc_with_optional_args(cl, all_cl)
- # help subcommand
- for sub in ['clusters', 'topologies',
- 'metrics', 'components', 'containers', 'help']:
- all_cl.append(['help', sub])
- return all_cl
-
- def test_topo(self):
- for cl in self.sample_topo_cls():
- self.assertEqual(0, main.main(cl))
-
- def test_lp(self):
- good, _ = self.sample_lp_cls()
- for cl in good:
- self.assertEqual(0, main.main(cl))
- #self.assertEqual(1, main.main(bad))
-
- def test_help(self):
- all_cl = []
- for sub in ['clusters', 'topologies', 'metrics',
- 'components', 'containers', 'help']:
- all_cl.append(['help', sub])
- for cl in all_cl:
- self.assertEqual(0, main.main(cl))
-
- def test_pp(self):
- good, bad = self.sample_pp_cls()
- for cl in good:
- self.assertEqual(0, main.main(cl))
- self.assertEqual(1, main.main(bad))
-
- def test_sample_clusters(self):
- good = self.sample_cluster_cls()
- for cl in good:
- self.assertEqual(0, main.main(cl))
-
def test_topo_result_to_table(self):
d, told = self.sample_topo_result()
- tnew, _, _ = topologies.to_table(d)
+ tnew, _ = topologies.to_table(d)
told.sort()
tnew.sort()
self.assertEqual(told, tnew)
-
- def test_cli_parsing(self):
- parser = main.create_parser()
- clis = self.sample_cli()
- for cli in clis:
- _, unknown_args = parser.parse_known_args(cli)
- self.assertTrue(unknown_args is not None)
diff --git a/integration_test/src/python/local_test_runner/main.py b/integration_test/src/python/local_test_runner/main.py
index 9c4d9c8..5d51937 100644
--- a/integration_test/src/python/local_test_runner/main.py
+++ b/integration_test/src/python/local_test_runner/main.py
@@ -40,6 +40,7 @@
from . import test_kill_tmaster
from . import test_scale_up
from . import test_template
+from . import test_explorer
TEST_CLASSES = [
test_template.TestTemplate,
@@ -49,6 +50,7 @@
test_kill_stmgr_metricsmgr.TestKillStmgrMetricsMgr,
test_scale_up.TestScaleUp,
# test_kill_bolt.TestKillBolt,
+ test_explorer.TestExplorer,
]
# The location of default configure file
diff --git a/integration_test/src/python/local_test_runner/test_explorer.py b/integration_test/src/python/local_test_runner/test_explorer.py
new file mode 100644
index 0000000..9bf6e28
--- /dev/null
+++ b/integration_test/src/python/local_test_runner/test_explorer.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python3
+# -*- encoding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+"""test_kill_stmgr.py"""
+import logging
+
+from contextlib import contextmanager
+from subprocess import run, Popen
+from time import sleep
+
+from . import test_template
+
+
+@contextmanager
+def bound_subprocess(popen: Popen) -> None:
+ with popen:
+ logging.debug(f"starting {popen!r}")
+ try:
+ yield
+ finally:
+ logging.debug(f"killing {popen!r}")
+ popen.kill()
+ popen.communicate()
+
+def run_explorer(*args):
+ cmd = ["heron-explorer", *args]
+ logging.debug(f"running command {cmd!r}")
+ run(cmd, check=True, timeout=5)
+ logging.debug(f"finished command {cmd!r}")
+
+
+class TestExplorer(test_template.TestTemplate):
+
+
+ def execute_test_case(self):
+ from getpass import getuser
+ cre = f"{self.params['cluster']}/{getuser()}/default"
+ topology = self.params["topologyName"]
+ # heron-explorer depens on heron-tracker, so start an instance as it is not started
+ # by heron-cli when running against the local "cluster"
+ with bound_subprocess(Popen(["heron-tracker"])):
+ sleep(2)
+ run_explorer("clusters")
+
+ cre_parts = cre.split("/")
+ for i in range(len(cre_parts)):
+ run_explorer("topologies", "/".join(cre_parts[:i+1]))
+
+ run_explorer("logical-plan", cre, topology)
+ run_explorer("logical-plan", cre, topology, "--component-type=bolts")
+ run_explorer("logical-plan", cre, topology, "--component-type=spouts")
+
+ run_explorer("physical-plan", "containers", cre, topology)
+ run_explorer("physical-plan", "containers", cre, topology, "--id=1")
+ run_explorer("physical-plan", "metrics", cre, topology)