blob: c5a52038616ec69a51a4a4a15b19329c231df241 [file] [log] [blame]
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
''' heron.py '''
import logging
import tornado.httpclient
import tornado.gen
from tornado.options import options
from .fetch import fetch_url_as_json
from .query import QueryHandler
# pylint: disable=bad-whitespace
CLUSTER_URL_FMT = "%s/clusters"
TOPOLOGIES_URL_FMT = "%s/topologies"
TOPOLOGIES_STATS_URL_FMT = "%s/states" % TOPOLOGIES_URL_FMT
EXECUTION_STATE_URL_FMT = "%s/executionstate" % TOPOLOGIES_URL_FMT
LOGICALPLAN_URL_FMT = "%s/logicalplan" % TOPOLOGIES_URL_FMT
PHYSICALPLAN_URL_FMT = "%s/physicalplan" % TOPOLOGIES_URL_FMT
PACKINGPLAN_URL_FMT = "%s/packingplan" % TOPOLOGIES_URL_FMT
SCHEDULER_LOCATION_URL_FMT = "%s/schedulerlocation" % TOPOLOGIES_URL_FMT
METRICS_URL_FMT = "%s/metrics" % TOPOLOGIES_URL_FMT
METRICS_QUERY_URL_FMT = "%s/metricsquery" % TOPOLOGIES_URL_FMT
METRICS_TIMELINE_URL_FMT = "%s/metricstimeline" % TOPOLOGIES_URL_FMT
EXCEPTIONS_URL_FMT = "%s/exceptions" % TOPOLOGIES_URL_FMT
EXCEPTION_SUMMARY_URL_FMT = "%s/exceptionsummary" % TOPOLOGIES_URL_FMT
INFO_URL_FMT = "%s/info" % TOPOLOGIES_URL_FMT
PID_URL_FMT = "%s/pid" % TOPOLOGIES_URL_FMT
JSTACK_URL_FMT = "%s/jstack" % TOPOLOGIES_URL_FMT
JMAP_URL_FMT = "%s/jmap" % TOPOLOGIES_URL_FMT
HISTOGRAM_URL_FMT = "%s/histo" % TOPOLOGIES_URL_FMT
FILE_DATA_URL_FMT = "%s/containerfiledata" % TOPOLOGIES_URL_FMT
FILE_DOWNLOAD_URL_FMT = "%s/containerfiledownload" % TOPOLOGIES_URL_FMT
FILESTATS_URL_FMT = "%s/containerfilestats" % TOPOLOGIES_URL_FMT
capacity = "DIVIDE(" \
" DEFAULT(0," \
" MULTIPLY(" \
" TS({0},{1},__execute-count/default)," \
" TS({0},{1},__execute-latency/default)" \
" )" \
" )," \
" 60000000000" \
")"
failures = "DEFAULT(0," \
" DIVIDE(" \
" TS({0},{1},__fail-count/default)," \
" SUM(" \
" DEFAULT(1, TS({0},{1},__execute-count/default))," \
" DEFAULT(0, TS({0},{1},__fail-count/default))" \
" )" \
" )" \
")"
cpu = "DEFAULT(0, TS({0},{1},__jvm-process-cpu-load))"
memory = "DIVIDE(" \
" DEFAULT(0, TS({0},{1},__jvm-memory-used-mb))," \
" DEFAULT(1, TS({0},{1},__jvm-memory-mb-total))" \
")"
gc = "RATE(TS({0},{1},__jvm-gc-collection-time-ms))"
backpressure = "DEFAULT(0, TS(__stmgr__,*,__time_spent_back_pressure_by_compid/{0}))"
queries = dict(
cpu=cpu,
capacity=capacity,
failures=failures,
memory=memory,
gc=gc,
backpressure=backpressure
)
def get_tracker_endpoint():
'''
Get the endpoint for heron tracker
:return:
'''
return options.tracker_url
def create_url(fmt):
'''
Given an URL format, substitute with tracker service endpoint
:param fmt:
:return:
'''
return fmt % get_tracker_endpoint()
@tornado.gen.coroutine
def get_clusters():
'''
:return:
'''
request_url = create_url(CLUSTER_URL_FMT)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_topologies():
'''
Get the list of topologies given a data center from heron tracker
:return:
'''
request_url = create_url(TOPOLOGIES_URL_FMT)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_topologies_states():
'''
Get the list of topologies and their states
:return:
'''
request_url = create_url(TOPOLOGIES_STATS_URL_FMT)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
@tornado.gen.coroutine
def _get_topologies(cluster, role=None, env=None):
endpoint = create_url(TOPOLOGIES_URL_FMT)
params = dict(cluster=cluster)
if role is not None:
params['role'] = role
if env is not None:
params['environ'] = env
request_url = tornado.httputil.url_concat(endpoint, params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
def get_cluster_topologies(cluster):
'''
Get the list of topologies given a cluster
:param cluster:
:return:
'''
return _get_topologies(cluster)
################################################################################
def get_cluster_role_topologies(cluster, role):
'''
Get the list of topologies given a cluster submitted by a given role
:param cluster:
:param role:
:return:
'''
return _get_topologies(cluster, role=role)
################################################################################
def get_cluster_role_env_topologies(cluster, role, env):
'''
Get the list of topologies given a cluster submitted by a given role under a given environment
:param cluster:
:param role:
:param env:
:return:
'''
return _get_topologies(cluster, role=role, env=env)
################################################################################
@tornado.gen.coroutine
def get_execution_state(cluster, environ, topology, role=None):
'''
Get the execution state of a topology in a cluster
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(cluster=cluster, environ=environ, topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(create_url(EXECUTION_STATE_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_logical_plan(cluster, environ, topology, role=None):
'''
Get the logical plan state of a topology in a cluster
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(cluster=cluster, environ=environ, topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(LOGICALPLAN_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_comps(cluster, environ, topology, role=None):
'''
Get the list of component names for the topology from Heron Nest
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(cluster=cluster, environ=environ, topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(LOGICALPLAN_URL_FMT), params)
lplan = yield fetch_url_as_json(request_url)
comps = list(lplan['spouts'].keys()) + list(lplan['bolts'].keys())
raise tornado.gen.Return(comps)
################################################################################
@tornado.gen.coroutine
def get_instances(cluster, environ, topology, role=None):
'''
Get the list of instances for the topology from Heron Nest
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(cluster=cluster, environ=environ, topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(PHYSICALPLAN_URL_FMT), params)
pplan = yield fetch_url_as_json(request_url)
instances = list(pplan['instances'].keys())
raise tornado.gen.Return(instances)
################################################################################
@tornado.gen.coroutine
def get_packing_plan(cluster, environ, topology, role=None):
'''
Get the packing plan state of a topology in a cluster from tracker
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(cluster=cluster, environ=environ, topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(PACKINGPLAN_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_physical_plan(cluster, environ, topology, role=None):
'''
Get the physical plan state of a topology in a cluster from tracker
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(cluster=cluster, environ=environ, topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(PHYSICALPLAN_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_scheduler_location(cluster, environ, topology, role=None):
'''
Get the scheduler location of a topology in a cluster from tracker
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(cluster=cluster, environ=environ, topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(SCHEDULER_LOCATION_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_component_exceptionsummary(cluster, environ, topology, component, role=None):
'''
Get summary of exception for a component
:param cluster:
:param environ:
:param topology:
:param component:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
component=component)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(EXCEPTION_SUMMARY_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_component_exceptions(cluster, environ, topology, component, role=None):
'''
Get exceptions for 'component' for 'topology'
:param cluster:
:param environ:
:param topology:
:param component:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
component=component)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(EXCEPTIONS_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_comp_instance_metrics(cluster, environ, topology, component,
metrics, instances, time_range, role=None):
'''
Get the metrics for some instances of a topology from tracker
:param cluster:
:param environ:
:param topology:
:param component:
:param metrics: dict of display name to cuckoo name
:param instances:
:param time_range: 2-tuple consisting of start and end of range
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
component=component)
if role is not None:
params['role'] = role
# form the fetch url
request_url = tornado.httputil.url_concat(
create_url(METRICS_URL_FMT), params)
# convert a single instance to a list, if needed
all_instances = instances if isinstance(instances, list) else [instances]
# append each metric to the url
for _, metric_name in list(metrics.items()):
request_url = tornado.httputil.url_concat(request_url, dict(metricname=metric_name[0]))
# append each instance to the url
for i in all_instances:
request_url = tornado.httputil.url_concat(request_url, dict(instance=i))
# append the time interval to the url
request_url = tornado.httputil.url_concat(request_url, dict(interval=time_range[1]))
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_comp_metrics(cluster, environ, topology, component,
instances, metricnames, time_range, role=None):
'''
Get the metrics for all the instances of a topology from Heron Nest
:param cluster:
:param environ:
:param topology:
:param component:
:param instances:
:param metricnames: dict of display name to cuckoo name
:param time_range: 2-tuple consisting of start and end of range
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
component=component)
if role is not None:
params['role'] = role
# form the url
request_url = tornado.httputil.url_concat(
create_url(METRICS_URL_FMT), params)
# append each metric to the url
for metric_name in metricnames:
request_url = tornado.httputil.url_concat(request_url, dict(metricname=metric_name))
# append each instance to the url
for instance in instances:
request_url = tornado.httputil.url_concat(request_url, dict(instance=instance))
# append the time interval to the url
request_url = tornado.httputil.url_concat(request_url, dict(interval=time_range[1]))
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_metrics(cluster, environment, topology, timerange, query, role=None):
'''
Get the metrics for a topology from tracker
:param cluster:
:param environment:
:param topology:
:param timerange:
:param query:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environment,
topology=topology,
starttime=timerange[0],
endtime=timerange[1],
query=query)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(METRICS_QUERY_URL_FMT), params
)
logging.info("get_metrics %s", request_url)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
################################################################################
@tornado.gen.coroutine
def get_comp_metrics_timeline(cluster, environ, topology, component,
instances, metricnames, time_range, role=None):
'''
Get the minute-by-minute metrics for all instances of a topology from tracker
:param cluster:
:param environ:
:param topology:
:param component:
:param instances:
:param metricnames: dict of display name to cuckoo name
:param time_range: 2-tuple consisting of start and end of range
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
component=component)
if role is not None:
params['role'] = role
# form the url
request_url = tornado.httputil.url_concat(create_url(METRICS_TIMELINE_URL_FMT), params)
if role is not None:
request_url = tornado.httputil.url_concat(request_url, dict(role=role))
# append each metric to the url
for metric_name in metricnames:
request_url = tornado.httputil.url_concat(request_url, dict(metricname=metric_name))
# append each instance to the url
for instance in instances:
request_url = tornado.httputil.url_concat(request_url, dict(instance=instance))
# append the time interval to the url
request_url = tornado.httputil.url_concat(
request_url, dict(starttime=time_range[0], endtime=time_range[1]))
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
@tornado.gen.coroutine
def get_topology_info(cluster, environ, topology, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(create_url(INFO_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
# Get pid of the instance
@tornado.gen.coroutine
def get_instance_pid(cluster, environ, topology, instance, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param instance:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
instance=instance)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(create_url(PID_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
# Get jstack of instance
@tornado.gen.coroutine
def get_instance_jstack(cluster, environ, topology, instance, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param instance:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
instance=instance)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(JSTACK_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
# Get histogram of active memory objects.
@tornado.gen.coroutine
def get_instance_mem_histogram(cluster, environ, topology, instance, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param instance:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
instance=instance)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(HISTOGRAM_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
# Call heap dump for an instance and save it at /tmp/heap.bin
@tornado.gen.coroutine
def run_instance_jmap(cluster, environ, topology, instance, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param instance:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
instance=instance)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(JMAP_URL_FMT), params)
if role is not None:
request_url = tornado.httputil.url_concat(request_url, dict(role=role))
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
# Get a url to download a file from the container
def get_container_file_download_url(cluster, environ, topology, container,
path, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param container:
:param path:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
container=container,
path=path)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(FILE_DOWNLOAD_URL_FMT), params)
if role is not None:
request_url = tornado.httputil.url_concat(request_url, dict(role=role))
return request_url
# Get file data from the container
@tornado.gen.coroutine
def get_container_file_data(cluster, environ, topology, container,
path, offset, length, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param container:
:param path:
:param offset:
:param length:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
container=container,
path=path,
offset=offset,
length=length)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(
create_url(FILE_DATA_URL_FMT), params)
if role is not None:
request_url = tornado.httputil.url_concat(request_url, dict(role=role))
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
# Get filestats
@tornado.gen.coroutine
def get_filestats(cluster, environ, topology, container, path, role=None):
'''
:param cluster:
:param environ:
:param topology:
:param container:
:param path:
:param role:
:return:
'''
params = dict(
cluster=cluster,
environ=environ,
topology=topology,
container=container,
path=path)
if role is not None:
params['role'] = role
request_url = tornado.httputil.url_concat(create_url(FILESTATS_URL_FMT), params)
raise tornado.gen.Return((yield fetch_url_as_json(request_url)))
class HeronQueryHandler(QueryHandler):
''' HeronQueryHandler '''
@tornado.gen.coroutine
def fetch(self, cluster, metric, topology, component, instance, timerange, environ=None):
'''
:param cluster:
:param metric:
:param topology:
:param component:
:param instance:
:param timerange:
:param environ:
:return:
'''
components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
futures = []
for comp in components:
query = self.get_query(metric, comp, instance)
future = get_metrics(cluster, environ, topology, timerange, query)
futures.append(future)
results = yield futures
timelines = []
for result in results:
timelines.extend(result["timeline"])
result = self.get_metric_response(timerange, timelines, False)
raise tornado.gen.Return(result)
@tornado.gen.coroutine
def fetch_max(self, cluster, metric, topology, component, instance, timerange, environ=None):
'''
:param cluster:
:param metric:
:param topology:
:param component:
:param instance:
:param timerange:
:param environ:
:return:
'''
components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
result = {}
futures = []
for comp in components:
query = self.get_query(metric, comp, instance)
max_query = "MAX(%s)" % query
future = get_metrics(cluster, environ, topology, timerange, max_query)
futures.append(future)
results = yield futures
data = self.compute_max(results)
result = self.get_metric_response(timerange, data, True)
raise tornado.gen.Return(result)
# pylint: disable=unused-argument
@tornado.gen.coroutine
def fetch_backpressure(self, cluster, metric, topology, component, instance, \
timerange, is_max, environ=None):
'''
:param cluster:
:param metric:
:param topology:
:param component:
:param instance:
:param timerange:
:param isMax:
:param environ:
:return:
'''
instances = yield get_instances(cluster, environ, topology)
if component != "*":
filtered_inst = [instance for instance in instances if instance.split("_")[2] == component]
else:
filtered_inst = instances
futures_dict = {}
for inst in filtered_inst:
query = queries.get(metric).format(inst)
futures_dict[inst] = get_metrics(cluster, environ, topology, timerange, query)
res = yield futures_dict
if not is_max:
timelines = []
for key in res:
result = res[key]
# Replacing stream manager instance name with component instance name
if len(result["timeline"]) > 0:
result["timeline"][0]["instance"] = key
timelines.extend(result["timeline"])
result = self.get_metric_response(timerange, timelines, is_max)
else:
data = self.compute_max(list(res.values()))
result = self.get_metric_response(timerange, data, is_max)
raise tornado.gen.Return(result)
# pylint: disable=no-self-use
def compute_max(self, multi_ts):
'''
:param multi_ts:
:return:
'''
# Some components don't have specific metrics such as capacity hence the
# key set is empty. These components are filtered out first.
filtered_ts = [ts for ts in multi_ts if len(ts["timeline"][0]["data"]) > 0]
if len(filtered_ts) > 0 and len(filtered_ts[0]["timeline"]) > 0:
keys = list(filtered_ts[0]["timeline"][0]["data"].keys())
timelines = ([res["timeline"][0]["data"][key] for key in keys] for res in filtered_ts)
values = (max(v) for v in zip(*timelines))
return dict(list(zip(keys, values)))
return {}
# pylint: disable=no-self-use
def get_metric_response(self, timerange, data, isMax):
'''
:param timerange:
:param data:
:param isMax:
:return:
'''
if isMax:
return dict(
status="success",
starttime=timerange[0],
endtime=timerange[1],
result=dict(timeline=[dict(data=data)])
)
return dict(
status="success",
starttime=timerange[0],
endtime=timerange[1],
result=dict(timeline=data)
)
# pylint: disable=no-self-use
def get_query(self, metric, component, instance):
'''
:param metric:
:param component:
:param instance:
:return:
'''
q = queries.get(metric)
return q.format(component, instance)