Add tables for component running info (#3394)

* Add tables for component running info

* format capacity utilization
diff --git a/heron/tools/ui/resources/static/js/topologies.js b/heron/tools/ui/resources/static/js/topologies.js
index 9a423e9..1d1a0f9 100644
--- a/heron/tools/ui/resources/static/js/topologies.js
+++ b/heron/tools/ui/resources/static/js/topologies.js
@@ -29,6 +29,8 @@
   countersUrlFlags = {};
 }, 60000);
 
+const ParallelismConfig = "topology.component.parallelism";
+
 var AllExceptions = React.createClass({
   getInitialState: function() {
     return {}
@@ -269,7 +271,7 @@
 
   fetchLplan: function () {
     if (this.state.lplan === undefined) {
-      var fetch_url = this.props.baseUrl +
+      const fetch_url = this.props.baseUrl +
        "/topologies/" +
         this.props.cluster + "/" +
         this.props.environ + "/" +
@@ -392,7 +394,7 @@
 
   fetchCounters: function() {
     if (this.props.hasOwnProperty("comp_name")) {
-      var fetch_url = this.props.baseUrl + "/topologies/metrics?" +
+      const fetch_url = this.props.baseUrl + "/topologies/metrics?" +
         "cluster=" + this.props.cluster + "&" +
         "environ=" + this.props.environ + "&" +
         "topology=" + this.props.topology + "&" +
@@ -450,16 +452,19 @@
       // Fetch all metrics for all the spouts.
       // Must have pplan and metricnames to continue.
       if (this.state.pplan && this.state.lplan) {
+        const fetch_url = this.props.baseUrl + "/topologies/metrics?" +
+          "cluster=" + this.props.cluster + "&" +
+          "environ=" + this.props.environ + "&" +
+          "topology=" + this.props.topology;
+
         var spoutNames = [];
         for (var name in this.state.lplan.spouts) {
           if (this.state.lplan.spouts.hasOwnProperty(name)) {
             spoutNames.push(name);
           }
         }
-        var fetch_url = this.props.baseUrl + "/topologies/metrics?" +
-          "cluster=" + this.props.cluster + "&" +
-          "environ=" + this.props.environ + "&" +
-          "topology=" + this.props.topology;
+
+        // Load metrics for spouts.
         for (var i = 0; i < spoutNames.length; i++) {
           var spout = spoutNames[i];
           for (var j = 0; j < this.state.lplan.spouts[spout].outputs.length; j++) {
@@ -481,6 +486,35 @@
             }
           }
         }
+        // Load metrics for bolts.
+        var boltNames = [];
+        for (var name in this.state.lplan.bolts) {
+          if (this.state.lplan.bolts.hasOwnProperty(name)) {
+            boltNames.push(name);
+          }
+        }
+
+        for (var i = 0; i < boltNames.length; i++) {
+          var bolt = boltNames[i];
+          for (var j = 0; j < this.state.lplan.bolts[bolt].outputs.length; j++) {
+            var streamName = this.state.lplan.bolts[bolt].outputs[j].stream_name;
+            for (var timeRange in this.timeRanges) {
+              metricnameargs = "";
+              for (var boltMetric in this.boltMetricsInput) {
+                if (this.boltMetricsInput.hasOwnProperty(boltMetric)) {
+                  metricnameargs += "&metricname=" + boltMetric + streamName;
+                }
+              }
+              if (this.timeRanges.hasOwnProperty(timeRange) && metricnameargs != "") {
+                var url = fetch_url + metricnameargs +
+                  "&component=" + bolt +
+                  "&interval=" + this.timeRanges[timeRange];
+
+                this.fetchCountersURL(url, timeRange);
+              }
+            }
+          }
+        }
       }
     }
   },
@@ -566,6 +600,8 @@
     return (
       <div>
         <TopologyCounters info={info} />
+        <SpoutRunningInfo info={info} />
+        <BoltRunningInfo info={info} />
         <ComponentCounters info={info} />
         <InstanceCounters info={info} />
       </div>
@@ -573,6 +609,7 @@
   }
 });
 
+
 var TopologyCounters = React.createClass({
   capitalize: function(astr) {
     if (astr) {
@@ -586,9 +623,9 @@
   },
 
   getTopologyMetricsRows: function () {
-    var metrics = this.props.info.metrics;
+    const metrics = this.props.info.metrics;
+    const timeRanges = ["10 mins", "1 hr", "3 hrs", "All Time"];
     var aggregatedMetrics = {};
-    var timeRanges = ["10 mins", "1 hr", "3 hrs", "All Time"];
 
     // Get spout names.
     var spoutNames = [];
@@ -702,6 +739,385 @@
   }
 });
 
+/*
+ * This section contains key running information for all spouts.
+ * It is visible only when no component is selected.
+ */
+var SpoutRunningInfo = React.createClass({
+  capitalize: function(astr) {
+    if (astr) {
+        return astr.charAt(0).toUpperCase() + astr.slice(1);
+    }
+    return undefined;
+  },
+
+  getTitle: function () {
+    return "Spout Running Info: " + this.state.time;
+  },
+
+  getInitialState: function () {
+    return {
+      sortBy: 0,
+      reverse: false,
+      time: "10 mins"  // 10 mins metrics is used.
+    };
+  },
+
+  getSpoutRows: function (time) {
+    const metrics = this.props.info.metrics;
+    const metricNames = ["Emit Count", "Ack Count"];
+    var aggregatedMetrics = {};
+    var spoutNames = [];
+
+    // Get spout names.
+    var spouts = this.props.info.pplan.spouts;
+    for (var name in spouts) {
+      if (spouts.hasOwnProperty(name)) {
+        spoutNames.push(name);
+      }
+    }
+
+    for (var s = 0; s < spoutNames.length; s++) {
+      var spoutName = spoutNames[s];
+      // Init results to "_"
+      aggregatedMetrics[spoutName] = {};
+      for (var i in metricNames) {
+        var metricName = metricNames[i];
+        aggregatedMetrics[spoutName][metricName] = "_";
+      }
+      // Aggregate
+      if (metrics[spoutName].hasOwnProperty(time)) {
+        // For all metrics
+        for (var i in metricNames) {
+          var metricName = metricNames[i];
+          var count = 0;
+          var sum = 0;
+          if (metrics[spoutName][time].hasOwnProperty(metricName)) {
+            // For all streams/instances
+            for (var stream in metrics[spoutName][time][metricName]) {
+              if (metrics[spoutName][time][metricName].hasOwnProperty(stream)) {
+                var allMetrics = metrics[spoutName][time][metricName][stream].metrics;
+                for (var m in allMetrics) {
+                  if (allMetrics.hasOwnProperty(m)) {
+                    var v = Number(allMetrics[m]) / (metrics[spoutName][time][metricName][stream].scaleDevisor || 1);
+                    count++;
+                    sum += v;
+                  }
+                }
+              }
+            }
+          }
+          if (count > 0) {
+            aggregatedMetrics[spoutName][metricName] = sum;
+          }
+        }
+      }
+    }
+
+    var rows = [];
+    for (var id in spoutNames) {
+      var spoutName = spoutNames[id];
+      var row = [];
+      row.push(spoutName);
+      // Put parallelism
+      var spouts = this.props.info.lplan.spouts;
+      var parallelism = spouts[spoutName]["config"][ParallelismConfig];
+      row.push(parallelism);
+
+      // Put metrics
+      for (var i in metricNames) {
+        var metricName = metricNames[i];
+        row.push(aggregatedMetrics[spoutName][metricName]);
+      }
+
+      rows.push(row);
+    }
+    return rows;
+  },
+
+  render: function () {
+    if (this.props.info.comp_name) {
+      // A component is selected, return empty div.
+      return (<div id="spoutrunninginfo"></div>)
+    }
+    var title = this.getTitle();
+    var rows = this.getSpoutRows(this.state.time);
+
+    var reverse = this.state.reverse;
+    var sortKey = this.state.sortBy;
+    rows.sort(function (a, b) {
+      var aVal = a[sortKey];
+      var bVal = b[sortKey];
+      return (typeof aVal === "string" ? aVal.localeCompare(bVal) : (bVal - aVal)) * (reverse ? 1 : -1);
+    });
+    var setState = this.setState.bind(this);
+
+    const headings = ["Spout", "Parallelism", "Emit Count", "Ack Count"];
+
+    return (
+      <div id="spoutrunninginfo">
+        <div className="widget-header">
+          <div className="title">
+            <h4 style={{
+              "display": "inline-block",
+              "float": "left",
+              "margin-right": "10px"
+            }}>{title}</h4>
+            <div style={{
+              "padding-top": "10px",
+              "padding-bottom": "10px",
+            }}>
+            </div>
+          </div>
+        </div>
+        <table className="table table-striped table-hover no-margin">
+          <thead>
+            <tr>
+              {headings.map(function (heading, i) {
+                var classNameVals = [
+                  'sort',
+                  ((sortKey === i) && reverse) ? 'asc' : '',
+                  ((sortKey === i) && !reverse) ? 'desc' : ''
+                ].join(' ');
+                function clicked() {
+                  setState({
+                    sortBy: i,
+                    reverse: i === sortKey ? (!reverse) : true
+                  });
+                }
+                return <th key={i} className={classNameVals} onClick={clicked}>{heading}</th>;
+              })}
+            </tr>
+          </thead>
+          <tbody>
+            {rows.map(function (row) {
+              return <tr key={row[0]}>{
+                row.map(function (value, i) {
+                  return <td className="col-md-2" key={i}>{value}</td>;
+                })}</tr>;
+            })}
+          </tbody>
+        </table>
+      </div>
+    );
+  }
+});
+
+/*
+ * This section contains key running information for all bolts.
+ * It is visible only when no component is selected.
+ */
+var BoltRunningInfo = React.createClass({
+  capitalize: function(astr) {
+    if (astr) {
+        return astr.charAt(0).toUpperCase() + astr.slice(1);
+    }
+    return undefined;
+  },
+
+  getTitle: function () {
+    return "Bolt Running Info: " + this.state.time;
+  },
+
+  getInitialState: function () {
+    return {
+      sortBy: 0,
+      reverse: false,
+      time: "10 mins"  // 10 mins metrics is used.
+    };
+  },
+
+  getBoltRows: function (time) {
+    const metrics = this.props.info.metrics;
+    const metricNames = ["Execute Count", "Fail Count"];
+    const minUtilizationName = "Min Capacity Utilization";
+    const maxUtilizationName = "Max Capacity Utilization";
+    var aggregatedMetrics = {};
+    var boltNames = [];
+
+    // Get bolt names.
+    var bolts = this.props.info.pplan.bolts;
+    for (var name in bolts) {
+      if (bolts.hasOwnProperty(name)) {
+        boltNames.push(name);
+      }
+    }
+
+    for (var b = 0; b < boltNames.length; b++) {
+      var boltName = boltNames[b];
+      // Init results to "_"
+      aggregatedMetrics[boltName] = {};
+      for (var i in metricNames) {
+        var metricName = metricNames[i];
+        aggregatedMetrics[boltName][metricName] = "_";
+      }
+      aggregatedMetrics[boltName][minUtilizationName] = "_";
+      aggregatedMetrics[boltName][maxUtilizationName] = "_";
+
+      // Aggregate
+      if (metrics[boltName].hasOwnProperty(time)) {
+        // For all metrics
+        for (var i in metricNames) {
+          var metricName = metricNames[i];
+          var count = 0;
+          var sum = 0;
+          if (metrics[boltName][time].hasOwnProperty(metricName)) {
+            // For all streams/instances
+            for (var stream in metrics[boltName][time][metricName]) {
+              if (metrics[boltName][time][metricName].hasOwnProperty(stream)) {
+                var allMetrics = metrics[boltName][time][metricName][stream].metrics;
+                for (var m in allMetrics) {
+                  if (allMetrics.hasOwnProperty(m)) {
+                    var v = Number(allMetrics[m]) / (metrics[boltName][time][metricName][stream].scaleDevisor || 1);
+                    count++;
+                    sum += v;
+                  }
+                }
+              }
+            }
+          }
+          if (count > 0) {
+            aggregatedMetrics[boltName][metricName] = sum;
+          }
+        }
+        // For capacity utilization. Manually calculate it by execution count times latency.
+        const executeCountMetricName = "Execute Count";
+        const executeLatencyMetricName = "Execute Latency (ms)";
+        if (metrics[boltName][time].hasOwnProperty(executeCountMetricName) &&
+            metrics[boltName][time].hasOwnProperty(executeLatencyMetricName)) {
+          // For all streams/instances
+          var instanceUtilization = {};  // milliseconds spend on execution, in the window.
+          for (var stream in metrics[boltName][time][executeCountMetricName]) {
+            if (metrics[boltName][time][executeCountMetricName].hasOwnProperty(stream) &&
+                metrics[boltName][time][executeLatencyMetricName].hasOwnProperty(stream)) {
+              var countMetrics = metrics[boltName][time][executeCountMetricName][stream].metrics;
+              var latencyMetrics = metrics[boltName][time][executeLatencyMetricName][stream].metrics;
+              // For each intance
+              for (var m in countMetrics) {
+                if (countMetrics.hasOwnProperty(m) && latencyMetrics.hasOwnProperty(m)) {
+                  var count = Number(countMetrics[m]) / (metrics[boltName][time][executeCountMetricName][stream].scaleDevisor || 1);
+                  var latency = Number(latencyMetrics[m]) / (metrics[boltName][time][executeLatencyMetricName][stream].scaleDevisor || 1);
+                  var utilization = count * latency;
+
+                  instanceUtilization[m] = (instanceUtilization[m] || 0) + utilization;
+                }
+              }
+            }
+          }
+          // Calculate min/max.
+          var min = -1;
+          var max = -1;
+          for (var i in instanceUtilization) {
+            if (instanceUtilization.hasOwnProperty(i)) {
+              var utilization = instanceUtilization[i] * 100 / (10 * 60 * 1000);   // Divide by the time window and get percentage.
+              if (min === -1 || min > utilization) {
+                min = utilization;
+              }
+              if (max === -1 || max < utilization) {
+                max = utilization;
+              }
+            }
+          }
+
+          if (min !== -1) {
+            aggregatedMetrics[boltName][minUtilizationName] = min.toFixed(2) + "%";
+            aggregatedMetrics[boltName][maxUtilizationName] = max.toFixed(2) + "%";
+          }
+        }
+      }
+    }
+
+    var rows = [];
+    for (var id in boltNames) {
+      var boltName = boltNames[id];
+      var row = [];
+      row.push(boltName);
+      // Put parallelism
+      var bolts = this.props.info.lplan.bolts;
+      var parallelism = bolts[boltName]["config"][ParallelismConfig];
+      row.push(parallelism);
+
+      // Put metrics
+      for (var i in metricNames) {
+        var metricName = metricNames[i];
+        row.push(aggregatedMetrics[boltName][metricName]);
+      }
+      row.push(aggregatedMetrics[boltName][minUtilizationName]);
+      row.push(aggregatedMetrics[boltName][maxUtilizationName]);
+
+      rows.push(row);
+    }
+    return rows;
+  },
+
+  render: function () {
+    if (this.props.info.comp_name) {
+      // A component is selected, return empty div.
+      return (<div id="boltrunninginfo"></div>)
+    }
+    var title = this.getTitle();
+    var rows = this.getBoltRows(this.state.time);
+
+    var reverse = this.state.reverse;
+    var sortKey = this.state.sortBy;
+    rows.sort(function (a, b) {
+      var aVal = a[sortKey];
+      var bVal = b[sortKey];
+      return (typeof aVal === "string" ? aVal.localeCompare(bVal) : (bVal - aVal)) * (reverse ? 1 : -1);
+    });
+    var setState = this.setState.bind(this);
+
+    const headings = ["Bolt", "Parallelism", "Execute Count", "Failure Count",
+                      "Capacity Utilization(min)", "Capacity Utilization(max)"];
+    return (
+      <div id="componentrunninginfo">
+        <div className="widget-header">
+          <div className="title">
+            <h4 style={{
+              "display": "inline-block",
+              "float": "left",
+              "margin-right": "10px"
+            }}>{title}</h4>
+            <div style={{
+              "padding-top": "10px",
+              "padding-bottom": "10px",
+            }}>
+            </div>
+          </div>
+        </div>
+        <table className="table table-striped table-hover no-margin">
+          <thead>
+            <tr>
+              {headings.map(function (heading, i) {
+                var classNameVals = [
+                  'sort',
+                  ((sortKey === i) && reverse) ? 'asc' : '',
+                  ((sortKey === i) && !reverse) ? 'desc' : ''
+                ].join(' ');
+                function clicked() {
+                  setState({
+                    sortBy: i,
+                    reverse: i === sortKey ? (!reverse) : true
+                  });
+                }
+                return <th key={i} className={classNameVals} onClick={clicked}>{heading}</th>;
+              })}
+            </tr>
+          </thead>
+          <tbody>
+            {rows.map(function (row) {
+              return <tr key={row[0]}>{
+                row.map(function (value, i) {
+                  return <td className="col-md-2" key={i}>{value}</td>;
+                })}</tr>;
+            })}
+          </tbody>
+        </table>
+      </div>
+    );
+  }
+});
+
 var ComponentCounters = React.createClass({
   capitalize: function(astr) {
     if (astr) {
@@ -1013,8 +1429,8 @@
       if (this.props.info.comp_type === "bolt") {
         var capacity = (Number(tenMinAggregatedMetrics["Execute Count"][instance]));
         capacity *= (Number(tenMinAggregatedMetrics["Execute Latency (ms)"][instance]));
-        capacity /= (10 * 60 * 1000);
-        row.push(Number(capacity.toFixed(3)) || 0);
+        capacity = capacity * 100 / (10 * 60 * 1000);
+        row.push(Number((capacity.toFixed(2)) || 0) + "%");
       }
       if (pplan) {
         // Get Job url from pplan.