blob: 70fa9da1948114774a9f1646b1bb731bb3485cb1 [file] [log] [blame]
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' main.py '''
import argparse
import os
import signal
import sys
import tornado.httpserver
import tornado.ioloop
import tornado.web
from tornado.options import define
from tornado.httpclient import AsyncHTTPClient
import heron.tools.common.src.python.utils.config as common_config
import heron.common.src.python.utils.log as log
from heron.tools.tracker.src.python import constants
from heron.tools.tracker.src.python import handlers
from heron.tools.tracker.src.python import utils
from heron.tools.tracker.src.python.config import Config, STATEMGRS_KEY
from heron.tools.tracker.src.python.tracker import Tracker
Log = log.Log
class Application(tornado.web.Application):
""" Tornado server application """
def __init__(self, config):
AsyncHTTPClient.configure(None, defaults=dict(request_timeout=120.0))
self.tracker = Tracker(config)
self.tracker.synch_topologies()
tornadoHandlers = [
(r"/", handlers.MainHandler),
(r"/clusters", handlers.ClustersHandler, {"tracker":self.tracker}),
(r"/topologies", handlers.TopologiesHandler, {"tracker":self.tracker}),
(r"/topologies/states", handlers.StatesHandler, {"tracker":self.tracker}),
(r"/topologies/info", handlers.TopologyHandler, {"tracker":self.tracker}),
(r"/topologies/logicalplan", handlers.LogicalPlanHandler, {"tracker":self.tracker}),
(r"/topologies/config", handlers.TopologyConfigHandler, {"tracker":self.tracker}),
(r"/topologies/containerfiledata", handlers.ContainerFileDataHandler,
{"tracker":self.tracker}),
(r"/topologies/containerfiledownload", handlers.ContainerFileDownloadHandler,
{"tracker":self.tracker}),
(r"/topologies/containerfilestats",
handlers.ContainerFileStatsHandler, {"tracker":self.tracker}),
(r"/topologies/physicalplan", handlers.PhysicalPlanHandler, {"tracker":self.tracker}),
(r"/topologies/packingplan", handlers.PackingPlanHandler, {"tracker":self.tracker}),
# Deprecated. See https://github.com/apache/incubator-heron/issues/1754
(r"/topologies/executionstate", handlers.ExecutionStateHandler, {"tracker":self.tracker}),
(r"/topologies/schedulerlocation", handlers.SchedulerLocationHandler,
{"tracker":self.tracker}),
(r"/topologies/metadata", handlers.MetaDataHandler, {"tracker":self.tracker}),
(r"/topologies/runtimestate", handlers.RuntimeStateHandler, {"tracker":self.tracker}),
(r"/topologies/metrics", handlers.MetricsHandler, {"tracker":self.tracker}),
(r"/topologies/metricstimeline", handlers.MetricsTimelineHandler, {"tracker":self.tracker}),
(r"/topologies/metricsquery", handlers.MetricsQueryHandler, {"tracker":self.tracker}),
(r"/topologies/exceptions", handlers.ExceptionHandler, {"tracker":self.tracker}),
(r"/topologies/exceptionsummary", handlers.ExceptionSummaryHandler,
{"tracker":self.tracker}),
(r"/machines", handlers.MachinesHandler, {"tracker":self.tracker}),
(r"/topologies/pid", handlers.PidHandler, {"tracker":self.tracker}),
(r"/topologies/jstack", handlers.JstackHandler, {"tracker":self.tracker}),
(r"/topologies/jmap", handlers.JmapHandler, {"tracker":self.tracker}),
(r"/topologies/histo", handlers.MemoryHistogramHandler, {"tracker":self.tracker}),
(r"(.*)", handlers.DefaultHandler),
]
settings = dict(
debug=True,
serve_traceback=True,
static_path=os.path.dirname(__file__)
)
tornado.web.Application.__init__(self, tornadoHandlers, **settings)
Log.info("Tracker has started")
def stop(self):
self.tracker.stop_sync()
# pylint: disable=protected-access
class _HelpAction(argparse._HelpAction):
""" HelpAction """
def __call__(self, parser, namespace, values, option_string=None):
parser.print_help()
# retrieve subparsers from parser
subparsers_actions = [
action for action in parser._actions
if isinstance(action, argparse._SubParsersAction)]
# there will probably only be one subparser_action,
# but better save than sorry
for subparsers_action in subparsers_actions:
# get all subparsers and print help
for choice, subparser in list(subparsers_action.choices.items()):
print("Subparser '{}'".format(choice))
print(subparser.format_help())
parser.exit()
# pylint: disable=bad-super-call
class SubcommandHelpFormatter(argparse.RawDescriptionHelpFormatter):
""" Subcommand help formatter """
def _format_action(self, action):
parts = super(argparse.RawDescriptionHelpFormatter, self)._format_action(action)
if action.nargs == argparse.PARSER:
parts = "\n".join(parts.split("\n")[1:])
return parts
def add_titles(parser):
""" add titles """
parser._positionals.title = "Required arguments"
parser._optionals.title = "Optional arguments"
return parser
def add_arguments(parser):
""" add arguments """
default_config_file = os.path.join(
utils.get_heron_tracker_conf_dir(), constants.DEFAULT_CONFIG_FILE)
parser.add_argument(
'--config-file',
metavar='(a string; path to config file; default: "' + default_config_file + '")',
default=default_config_file)
parser.add_argument(
'--type',
metavar='(an string; type of state manager (zookeeper or file, etc.); example: ' \
+ str(constants.DEFAULT_STATE_MANAGER_TYPE) + ')',
choices=["file", "zookeeper"])
parser.add_argument(
'--name',
metavar='(an string; name to be used for the state manager; example: ' \
+ str(constants.DEFAULT_STATE_MANAGER_NAME) + ')')
parser.add_argument(
'--rootpath',
metavar='(an string; where all the states are stored; example: ' \
+ str(constants.DEFAULT_STATE_MANAGER_ROOTPATH) + ')')
parser.add_argument(
'--tunnelhost',
metavar='(an string; if ssh tunneling needs to be established to connect to it; example: ' \
+ str(constants.DEFAULT_STATE_MANAGER_TUNNELHOST) + ')')
parser.add_argument(
'--hostport',
metavar='(an string; only used to connect to zk, must be of the form \'host:port\';'\
' example: ' + str(constants.DEFAULT_STATE_MANAGER_HOSTPORT) + ')')
parser.add_argument(
'--port',
metavar='(an integer; port to listen; default: ' + str(constants.DEFAULT_PORT) + ')',
type=int,
default=constants.DEFAULT_PORT)
parser.add_argument(
'--verbose',
action='store_true')
return parser
def create_parsers():
""" create argument parser """
parser = argparse.ArgumentParser(
epilog='For detailed documentation, go to http://github.com/apache/incubator-heron',
usage="%(prog)s [options] [help]",
add_help=False)
parser = add_titles(parser)
parser = add_arguments(parser)
ya_parser = argparse.ArgumentParser(
parents=[parser],
formatter_class=SubcommandHelpFormatter,
add_help=False)
subparsers = ya_parser.add_subparsers(
title="Available commands")
help_parser = subparsers.add_parser(
'help',
help='Prints help',
add_help=False)
help_parser.set_defaults(help=True)
subparsers.add_parser(
'version',
help='Prints version',
add_help=True)
return parser, ya_parser
def define_options(port, config_file):
""" define Tornado global variables """
define("port", default=port)
define("config_file", default=config_file)
def create_tracker_config(namespace):
# try to parse the config file if we find one
config_file = namespace["config_file"]
config = utils.parse_config_file(config_file)
if config is None:
Log.debug("Config file does not exists: %s" % config_file)
config = {STATEMGRS_KEY:[{}]}
# update the config if we have any flags
config_flags = ["type", "name", "rootpath", "tunnelhost", "hostport"]
config_to_update = config[STATEMGRS_KEY][0]
for flag in config_flags:
value = namespace.get(flag, None)
if value is not None:
config_to_update[flag] = value
return config
def main():
""" main """
# create the parser and parse the arguments
(parser, _) = create_parsers()
(args, remaining) = parser.parse_known_args()
if remaining == ['help']:
parser.print_help()
parser.exit()
elif remaining == ['version']:
common_config.print_build_info()
parser.exit()
elif remaining != []:
Log.error('Invalid subcommand')
sys.exit(1)
namespace = vars(args)
log.set_logging_level(namespace)
# set Tornado global option
define_options(namespace['port'], namespace['config_file'])
config = Config(create_tracker_config(namespace))
# create Tornado application
application = Application(config)
# pylint: disable=unused-argument
# SIGINT handler:
# 1. stop all the running zkstatemanager and filestatemanagers
# 2. stop the Tornado IO loop
def signal_handler(signum, frame):
# start a new line after ^C character because this looks nice
print('\n', end='')
application.stop()
tornado.ioloop.IOLoop.instance().stop()
# associate SIGINT and SIGTERM with a handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
Log.info("Running on port: %d", namespace['port'])
if namespace["config_file"]:
Log.info("Using config file: %s", namespace['config_file'])
Log.info("Using state manager:\n" + str(config))
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(namespace['port'])
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()