Add metrics for tracking bp caused by local instances and remote stmgrs (#3362)

diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 9a752de..047c634 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -67,13 +67,16 @@
 // Num bytes lost since instances is not connected
 const sp_string METRIC_BYTES_TO_INSTANCES_LOST = "__bytes_to_workers_lost";
 
+// Time spent in back pressure caused by instances managed by this stmgr.
+const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE =
+    "__time_spent_back_pressure_by_local_instance";
 // Time spent in back pressure aggregated - back pressure initiated by us +
 // others
 const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_AGGR = "__server/__time_spent_back_pressure_aggr";
 // Time spent in back pressure because of a component id. The comp id will be
-// appended
-// to the string below
+// appended to the string below
 const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_COMPID = "__time_spent_back_pressure_by_compid/";
+
 // Prefix for connection buffer's metrics
 const sp_string CONNECTION_BUFFER_BY_INSTANCEID = "__connection_buffer_by_instanceid/";
 // Prefix for connection buffer's length metrics. This is different
@@ -117,6 +120,9 @@
   metrics_manager_client_->register_metric("__server", instance_server_metrics_);
   metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR,
                                            back_pressure_metric_aggr_);
+  back_pressure_metric_caused_by_local_instances_ = make_shared<heron::common::TimeSpentMetric>();
+  metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE,
+                                           back_pressure_metric_caused_by_local_instances_);
   spouts_under_back_pressure_ = false;
 
   // Update queue related metrics every 10 seconds
@@ -180,6 +186,8 @@
 
   metrics_manager_client_->unregister_metric("__server");
   metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR);
+  metrics_manager_client_->unregister_metric(
+      METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE);
 
   // cleanup the instance info
   for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
@@ -549,6 +557,8 @@
   return "";
 }
 
+// This function is called when the buffer in the connection is full (the instance is not consuming
+// tuples fast enough).
 void InstanceServer::StartBackPressureConnectionCb(Connection* _connection) {
   // The connection will notify us when we can stop the back pressure
   _connection->setCausedBackPressure();
@@ -568,6 +578,8 @@
 
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStartBackPressureToOtherStMgrs();
+    // Start backpressure from local instances metric
+    back_pressure_metric_caused_by_local_instances_->Start();
   }
 
   // Indicate which instance component had back pressure
@@ -578,6 +590,7 @@
   StartBackPressureOnSpouts();
 }
 
+// This function is called when the buffer in the connection is empty (the tuples are drained).
 void InstanceServer::StopBackPressureConnectionCb(Connection* _connection) {
   _connection->unsetCausedBackPressure();
 
@@ -605,6 +618,8 @@
 
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStopBackPressureToOtherStMgrs();
+    // Stop backpressure from local instances metric
+    back_pressure_metric_caused_by_local_instances_->Stop();
   }
   AttemptStopBackPressureFromSpouts();
 }
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index b4a6038..e3a1b82 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -31,11 +31,11 @@
 
 namespace heron {
 namespace common {
+class AssignableMetric;
 class MetricsMgrSt;
 class MultiCountMetric;
-class TimeSpentMetric;
-class AssignableMetric;
 class MultiMeanMetric;
+class TimeSpentMetric;
 }
 }
 
@@ -196,6 +196,7 @@
   shared_ptr<heron::common::MetricsMgrSt> metrics_manager_client_;
   shared_ptr<heron::common::MultiCountMetric> instance_server_metrics_;
   shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_aggr_;
+  shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_caused_by_local_instances_;
 
   bool spouts_under_back_pressure_;
 
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.cpp b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
index 12e573e..f16afca 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.cpp
@@ -47,6 +47,10 @@
 const sp_string METRIC_FAIL_TUPLES_FROM_STMGRS = "__fail_tuples_from_stmgrs";
 // Bytes received from other stream managers
 const sp_string METRIC_BYTES_FROM_STMGRS = "__bytes_from_stmgrs";
+// Time spent in back pressure caused by remote stream managers.
+const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR =
+    "__time_spent_back_pressure_by_remote_stmgr";
+
 
 StMgrServer::StMgrServer(shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
                          const sp_string& _topology_name, const sp_string& _topology_id,
@@ -80,6 +84,10 @@
   bytes_from_stmgrs_metrics_ = make_shared<heron::common::CountMetric>();
   metrics_manager_client_->register_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS,
                                            bytes_from_stmgrs_metrics_);
+  back_pressure_metric_caused_by_remote_stmgr_ = make_shared<heron::common::TimeSpentMetric>();
+  metrics_manager_client_->register_metric(
+      SERVER_SCOPE + METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR,
+      back_pressure_metric_caused_by_remote_stmgr_);
 }
 
 StMgrServer::~StMgrServer() {
@@ -88,6 +96,8 @@
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_ACK_TUPLES_FROM_STMGRS);
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_FAIL_TUPLES_FROM_STMGRS);
   metrics_manager_client_->unregister_metric(SERVER_SCOPE + METRIC_BYTES_FROM_STMGRS);
+  metrics_manager_client_->unregister_metric(
+      SERVER_SCOPE + METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_REMOTE_STMGR);
 }
 
 void StMgrServer::HandleNewConnection(Connection* _conn) {
@@ -180,10 +190,12 @@
 void StMgrServer::StartBackPressureClientCb(const sp_string& _other_stmgr_id) {
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStartBackPressureToOtherStMgrs();
+    // Start backpressure from remote stmgr metric
+    back_pressure_metric_caused_by_remote_stmgr_->Start();
   }
   remote_ends_who_caused_back_pressure_.insert(_other_stmgr_id);
-  LOG(INFO) << "We observe back pressure on sending data to remote stream manager "
-            << _other_stmgr_id;
+  LOG(WARNING) << "We observe back pressure on sending data to remote stream manager "
+               << _other_stmgr_id;
   stmgr_->StartBackPressureOnSpouts();
 }
 
@@ -194,9 +206,11 @@
 
   if (!stmgr_->DidAnnounceBackPressure()) {
     stmgr_->SendStopBackPressureToOtherStMgrs();
+    // Stop backpressure from remote stmgr metric
+    back_pressure_metric_caused_by_remote_stmgr_->Stop();
   }
-  LOG(INFO) << "We don't observe back pressure now on sending data to remote "
-               "stream manager "
+  LOG(WARNING) << "We don't observe back pressure now on sending data to remote "
+                  "stream manager "
             << _other_stmgr_id;
   if (!stmgr_->DidAnnounceBackPressure() && !stmgr_->DidOthersAnnounceBackPressure()) {
     stmgr_->AttemptStopBackPressureFromSpouts();
diff --git a/heron/stmgr/src/cpp/manager/stmgr-server.h b/heron/stmgr/src/cpp/manager/stmgr-server.h
index 325f0c1..d0baf69 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-server.h
+++ b/heron/stmgr/src/cpp/manager/stmgr-server.h
@@ -31,8 +31,9 @@
 
 namespace heron {
 namespace common {
-class MetricsMgrSt;
 class CountMetric;
+class MetricsMgrSt;
+class TimeSpentMetric;
 }
 }
 
@@ -107,6 +108,7 @@
   shared_ptr<heron::common::CountMetric> ack_tuples_from_stmgrs_metrics_;
   shared_ptr<heron::common::CountMetric> fail_tuples_from_stmgrs_metrics_;
   shared_ptr<heron::common::CountMetric> bytes_from_stmgrs_metrics_;
+  shared_ptr<heron::common::TimeSpentMetric> back_pressure_metric_caused_by_remote_stmgr_;
 };
 
 }  // namespace stmgr