Merge branch 'master' into kanishk/ui-bp-metric
diff --git a/heron/tools/common/src/python/access/heron_api.py b/heron/tools/common/src/python/access/heron_api.py
index f9d7cbf..3654682 100644
--- a/heron/tools/common/src/python/access/heron_api.py
+++ b/heron/tools/common/src/python/access/heron_api.py
@@ -74,12 +74,15 @@
 
 gc = "RATE(TS({0},{1},__jvm-gc-collection-time-ms))"
 
+bp = "DEFAULT(0, TS(__stmgr__,*,__time_spent_back_pressure_by_compid/{0}))"
+
 queries = dict(
     cpu=cpu,
     capacity=capacity,
     failures=failures,
     memory=memory,
-    gc=gc
+    gc=gc,
+    bp=bp
 )
 
 
@@ -233,6 +236,26 @@
   comps = lplan['spouts'].keys() + lplan['bolts'].keys()
   raise tornado.gen.Return(comps)
 
+################################################################################
+@tornado.gen.coroutine
+def get_instances(cluster, environ, topology, role=None):
+  '''
+  Get the list of instances for the topology from Heron Nest
+  :param cluster:
+  :param environ:
+  :param topology:
+  :param role:
+  :return:
+  '''
+  params = dict(cluster=cluster, environ=environ, topology=topology)
+  if role is not None:
+    params['role'] = role
+  request_url = tornado.httputil.url_concat(
+      create_url(PHYSICALPLAN_URL_FMT), params)
+  pplan = yield fetch_url_as_json(request_url)
+  instances = pplan['instances'].keys()
+  raise tornado.gen.Return(instances)
+
 
 ################################################################################
 @tornado.gen.coroutine
@@ -705,12 +728,7 @@
     for result in results:
       timelines.extend(result["timeline"])
 
-    result = dict(
-        status="success",
-        starttime=timerange[0],
-        endtime=timerange[1],
-        result=dict(timeline=timelines)
-    )
+    result = self.get_metric_response(timerange, timelines, False)
 
     raise tornado.gen.Return(result)
 
@@ -726,7 +744,6 @@
     :param environ:
     :return:
     '''
-
     components = [component] if component != "*" else (yield get_comps(cluster, environ, topology))
 
     result = {}
@@ -737,21 +754,91 @@
       future = get_metrics(cluster, environ, topology, timerange, max_query)
       futures.append(future)
 
-      results = yield futures
+    results = yield futures
 
-      keys = results[0]["timeline"][0]["data"].keys()
-      timelines = ([res["timeline"][0]["data"][key] for key in keys] for res in results)
+    data = self.compute_max(results)
+
+    result = self.get_metric_response(timerange, data, True)
+
+    raise tornado.gen.Return(result)
+
+  @tornado.gen.coroutine
+  def fetch_bp(self, cluster, metric, topology, component, instance, \
+    timerange, is_max, environ=None):
+    '''
+    :param cluster:
+    :param metric:
+    :param topology:
+    :param component:
+    :param instance:
+    :param timerange:
+    :param isMax:
+    :param environ:
+    :return:
+    '''
+    instances = yield get_instances(cluster, environ, topology)
+    if component != "*":
+      filtered_inst = [instance for instance in instances if instance.split("_")[2] == component]
+    else:
+      filtered_inst = instances
+
+    futures_dict = {}
+    for inst in filtered_inst:
+      query = queries.get(metric).format(inst)
+      futures_dict[inst] = get_metrics(cluster, environ, topology, timerange, query)
+
+    res = yield futures_dict
+
+    if not is_max:
+      timelines = []
+      for key in res:
+        result = res[key]
+        # Replacing stream manager instance name with component instance name
+        if len(result["timeline"]) > 0:
+          result["timeline"][0]["instance"] = key
+        timelines.extend(result["timeline"])
+      result = self.get_metric_response(timerange, timelines, is_max)
+    else:
+      data = self.compute_max(res.values())
+      result = self.get_metric_response(timerange, data, is_max)
+
+    raise tornado.gen.Return(result)
+
+  # pylint: disable=no-self-use
+  def compute_max(self, multi_ts):
+    '''
+    :param multi_ts:
+    :return:
+    '''
+    if len(multi_ts) > 0 and len(multi_ts[0]["timeline"]) > 0:
+      keys = multi_ts[0]["timeline"][0]["data"].keys()
+      timelines = ([res["timeline"][0]["data"][key] for key in keys] for res in multi_ts)
       values = (max(v) for v in zip(*timelines))
-      data = dict(zip(keys, values))
+      return dict(zip(keys, values))
+    return {}
 
-      result = dict(
+  # pylint: disable=no-self-use
+  def get_metric_response(self, timerange, data, isMax):
+    '''
+    :param timerange:
+    :param data:
+    :param isMax:
+    :return:
+    '''
+    if isMax:
+      return dict(
           status="success",
           starttime=timerange[0],
           endtime=timerange[1],
           result=dict(timeline=[dict(data=data)])
       )
 
-    raise tornado.gen.Return(result)
+    return dict(
+        status="success",
+        starttime=timerange[0],
+        endtime=timerange[1],
+        result=dict(timeline=data)
+    )
 
   # pylint: disable=no-self-use
   def get_query(self, metric, component, instance):
diff --git a/heron/tools/common/src/python/access/query.py b/heron/tools/common/src/python/access/query.py
index 29115f1..ad583b6 100644
--- a/heron/tools/common/src/python/access/query.py
+++ b/heron/tools/common/src/python/access/query.py
@@ -42,3 +42,19 @@
     :return:
     '''
     pass
+
+  def fetch_bp(self, cluster, metric, topology, component, instance, \
+    timerange, is_max, environ=None):
+    '''
+    :param cluster:
+    :param metric:
+    :param topology:
+    :param component:
+    :param instance:
+    :param timerange:
+    :param is_max:
+    :param environ:
+    :return:
+    '''
+    pass
+    
\ No newline at end of file
diff --git a/heron/tools/ui/resources/static/js/plan-stats.js b/heron/tools/ui/resources/static/js/plan-stats.js
index 9420cf0..31c59e1 100644
--- a/heron/tools/ui/resources/static/js/plan-stats.js
+++ b/heron/tools/ui/resources/static/js/plan-stats.js
@@ -60,6 +60,16 @@
         scaleTicks: [0, 500, 1000, 1500, 2000],
         format: function (d) { return d.toFixed(0); }
       },
+      {
+        name: 'BP',
+        metricName: 'bp',
+        get: getAndRenderStats,
+        tooltip: 'Milliseconds spent in back pressure per minute.',
+        legendDescription: 'ms in back pressure per minute',
+        loMedHi: [3000, 12000, 30000],
+        scaleTicks: [0, 7500, 15000, 22500, 30000],
+        format: function (d) { return d.toFixed(0); }
+      }
     ];
 
     var colData = [
diff --git a/heron/tools/ui/src/python/handlers/api/metrics.py b/heron/tools/ui/src/python/handlers/api/metrics.py
index 3ac3f65..34abf13 100644
--- a/heron/tools/ui/src/python/handlers/api/metrics.py
+++ b/heron/tools/ui/src/python/handlers/api/metrics.py
@@ -76,11 +76,17 @@
 
     # fetch the metrics
     futures = {}
-    fetch = query_handler.fetch_max if maxquery else query_handler.fetch
-    for comp in compnames:
-      future = fetch(cluster, metric, topology, component,
-                     instances, timerange, environ)
-      futures[comp] = future
+    if metric == "bp":
+      for comp in compnames:
+        future = query_handler.fetch_bp(cluster, metric, topology, component,
+                                        instances, timerange, maxquery, environ)
+        futures[comp] = future
+    else:
+      fetch = query_handler.fetch_max if maxquery else query_handler.fetch
+      for comp in compnames:
+        future = fetch(cluster, metric, topology, component,
+                       instances, timerange, environ)
+        futures[comp] = future
 
     results = yield futures
     self.write(results[component] if component else results)