Merge branch 'master' into kanishk/ui-bp-metric
diff --git a/heron/tools/common/src/python/access/heron_api.py b/heron/tools/common/src/python/access/heron_api.py
index f9d7cbf..3654682 100644
--- a/heron/tools/common/src/python/access/heron_api.py
+++ b/heron/tools/common/src/python/access/heron_api.py
@@ -74,12 +74,15 @@
gc = "RATE(TS({0},{1},__jvm-gc-collection-time-ms))"
+bp = "DEFAULT(0, TS(__stmgr__,*,__time_spent_back_pressure_by_compid/{0}))"
+
queries = dict(
cpu=cpu,
capacity=capacity,
failures=failures,
memory=memory,
- gc=gc
+ gc=gc,
+ bp=bp
)
@@ -233,6 +236,26 @@
comps = lplan['spouts'].keys() + lplan['bolts'].keys()
raise tornado.gen.Return(comps)
+################################################################################
+@tornado.gen.coroutine
+def get_instances(cluster, environ, topology, role=None):
+ '''
+ Get the list of instances for the topology from Heron Nest
+ :param cluster:
+ :param environ:
+ :param topology:
+ :param role:
+ :return:
+ '''
+ params = dict(cluster=cluster, environ=environ, topology=topology)
+ if role is not None:
+ params['role'] = role
+ request_url = tornado.httputil.url_concat(
+ create_url(PHYSICALPLAN_URL_FMT), params)
+ pplan = yield fetch_url_as_json(request_url)
+ instances = pplan['instances'].keys()
+ raise tornado.gen.Return(instances)
+
################################################################################
@tornado.gen.coroutine
@@ -705,12 +728,7 @@
for result in results:
timelines.extend(result["timeline"])
- result = dict(
- status="success",
- starttime=timerange[0],
- endtime=timerange[1],
- result=dict(timeline=timelines)
- )
+ result = self.get_metric_response(timerange, timelines, False)
raise tornado.gen.Return(result)
@@ -726,7 +744,6 @@
:param environ:
:return:
'''
-
components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
result = {}
@@ -737,21 +754,91 @@
future = get_metrics(cluster, environ, topology, timerange, max_query)
futures.append(future)
- results = yield futures
+ results = yield futures
- keys = results[0]["timeline"][0]["data"].keys()
- timelines = ([res["timeline"][0]["data"][key] for key in keys] for res in results)
+ data = self.compute_max(results)
+
+ result = self.get_metric_response(timerange, data, True)
+
+ raise tornado.gen.Return(result)
+
+ @tornado.gen.coroutine
+ def fetch_bp(self, cluster, metric, topology, component, instance, \
+ timerange, is_max, environ=None):
+ '''
+ :param cluster:
+ :param metric:
+ :param topology:
+ :param component:
+ :param instance:
+ :param timerange:
+ :param isMax:
+ :param environ:
+ :return:
+ '''
+ instances = yield get_instances(cluster, environ, topology)
+ if component != "*":
+ filtered_inst = [instance for instance in instances if instance.split("_")[2] == component]
+ else:
+ filtered_inst = instances
+
+ futures_dict = {}
+ for inst in filtered_inst:
+ query = queries.get(metric).format(inst)
+ futures_dict[inst] = get_metrics(cluster, environ, topology, timerange, query)
+
+ res = yield futures_dict
+
+ if not is_max:
+ timelines = []
+ for key in res:
+ result = res[key]
+ # Replacing stream manager instance name with component instance name
+ if len(result["timeline"]) > 0:
+ result["timeline"][0]["instance"] = key
+ timelines.extend(result["timeline"])
+ result = self.get_metric_response(timerange, timelines, is_max)
+ else:
+ data = self.compute_max(res.values())
+ result = self.get_metric_response(timerange, data, is_max)
+
+ raise tornado.gen.Return(result)
+
+ # pylint: disable=no-self-use
+ def compute_max(self, multi_ts):
+ '''
+ :param multi_ts:
+ :return:
+ '''
+ if len(multi_ts) > 0 and len(multi_ts[0]["timeline"]) > 0:
+ keys = multi_ts[0]["timeline"][0]["data"].keys()
+ timelines = ([res["timeline"][0]["data"][key] for key in keys] for res in multi_ts)
values = (max(v) for v in zip(*timelines))
- data = dict(zip(keys, values))
+ return dict(zip(keys, values))
+ return {}
- result = dict(
+ # pylint: disable=no-self-use
+ def get_metric_response(self, timerange, data, isMax):
+ '''
+ :param timerange:
+ :param data:
+ :param isMax:
+ :return:
+ '''
+ if isMax:
+ return dict(
status="success",
starttime=timerange[0],
endtime=timerange[1],
result=dict(timeline=[dict(data=data)])
)
- raise tornado.gen.Return(result)
+ return dict(
+ status="success",
+ starttime=timerange[0],
+ endtime=timerange[1],
+ result=dict(timeline=data)
+ )
# pylint: disable=no-self-use
def get_query(self, metric, component, instance):
diff --git a/heron/tools/common/src/python/access/query.py b/heron/tools/common/src/python/access/query.py
index 29115f1..ad583b6 100644
--- a/heron/tools/common/src/python/access/query.py
+++ b/heron/tools/common/src/python/access/query.py
@@ -42,3 +42,19 @@
:return:
'''
pass
+
+ def fetch_bp(self, cluster, metric, topology, component, instance, \
+ timerange, is_max, environ=None):
+ '''
+ :param cluster:
+ :param metric:
+ :param topology:
+ :param component:
+ :param instance:
+ :param timerange:
+ :param is_max:
+ :param environ:
+ :return:
+ '''
+ pass
+
\ No newline at end of file
diff --git a/heron/tools/ui/resources/static/js/plan-stats.js b/heron/tools/ui/resources/static/js/plan-stats.js
index 9420cf0..31c59e1 100644
--- a/heron/tools/ui/resources/static/js/plan-stats.js
+++ b/heron/tools/ui/resources/static/js/plan-stats.js
@@ -60,6 +60,16 @@
scaleTicks: [0, 500, 1000, 1500, 2000],
format: function (d) { return d.toFixed(0); }
},
+ {
+ name: 'BP',
+ metricName: 'bp',
+ get: getAndRenderStats,
+ tooltip: 'Milliseconds spent in back pressure per minute.',
+ legendDescription: 'ms in back pressure per minute',
+ loMedHi: [3000, 12000, 30000],
+ scaleTicks: [0, 7500, 15000, 22500, 30000],
+ format: function (d) { return d.toFixed(0); }
+ }
];
var colData = [
diff --git a/heron/tools/ui/src/python/handlers/api/metrics.py b/heron/tools/ui/src/python/handlers/api/metrics.py
index 3ac3f65..34abf13 100644
--- a/heron/tools/ui/src/python/handlers/api/metrics.py
+++ b/heron/tools/ui/src/python/handlers/api/metrics.py
@@ -76,11 +76,17 @@
# fetch the metrics
futures = {}
- fetch = query_handler.fetch_max if maxquery else query_handler.fetch
- for comp in compnames:
- future = fetch(cluster, metric, topology, component,
- instances, timerange, environ)
- futures[comp] = future
+ if metric == "bp":
+ for comp in compnames:
+ future = query_handler.fetch_bp(cluster, metric, topology, component,
+ instances, timerange, maxquery, environ)
+ futures[comp] = future
+ else:
+ fetch = query_handler.fetch_max if maxquery else query_handler.fetch
+ for comp in compnames:
+ future = fetch(cluster, metric, topology, component,
+ instances, timerange, environ)
+ futures[comp] = future
results = yield futures
self.write(results[component] if component else results)